This is an automated email from the ASF dual-hosted git repository. wuzhiguo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ambari-metrics.git
The following commit(s) were added to refs/heads/master by this push: new 8a6e6f7 AMBARI-25402: Please provide jvm metrics for kafka components in Ambari (#52) 8a6e6f7 is described below commit 8a6e6f71913b6141fc8be8728e29bd5286b57da4 Author: lucasbak <lucas.bakal...@gmail.com> AuthorDate: Mon Nov 14 17:55:58 2022 +0100 AMBARI-25402: Please provide jvm metrics for kafka components in Ambari (#52) --- .../hadoop/metrics2/sink/kafka/JvmMetricSet.java | 179 +++++++++++++++++++++ .../sink/kafka/KafkaTimelineMetricsReporter.java | 43 ++--- .../metrics2/sink/kafka/MetricNameBuilder.java | 85 ++++++++++ .../metrics2/sink/kafka/JvmMetricSetTest.java | 48 ++++++ 4 files changed, 335 insertions(+), 20 deletions(-) diff --git a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/JvmMetricSet.java b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/JvmMetricSet.java new file mode 100644 index 0000000..907c8a7 --- /dev/null +++ b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/JvmMetricSet.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.kafka; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.RuntimeMXBean; +import java.lang.management.ThreadMXBean; +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.util.RatioGauge; + +public class JvmMetricSet { + + private static final String MEMORY = "memory"; + private static final String THREADS = "threads"; + private static final String RUNTIME = "runtime"; + + private static final JvmMetricSet INSTANCE = new JvmMetricSet(); + + + public static JvmMetricSet getInstance() { + return INSTANCE; + } + + private final MemoryMXBean memoryMXBean; + private final ThreadMXBean threadMXBean; + private final RuntimeMXBean runtimeMXBean; + + private static class JvmMetric { + private final MetricName metricName; + private final Gauge<?> metric; + + JvmMetric(MetricName metricName, Gauge<?> metric) { + this.metricName = metricName; + this.metric = metric; + } + + MetricName getMetricName() { + return metricName; + } + + Gauge<?> getMetric() { + return metric; + } + } + + + private JvmMetricSet() { + this(ManagementFactory.getMemoryMXBean(), ManagementFactory.getThreadMXBean(), + ManagementFactory.getRuntimeMXBean()); + + } + + private JvmMetricSet(MemoryMXBean memoryMXBean, ThreadMXBean threadMXBean, RuntimeMXBean runtimeMXBean) { + this.memoryMXBean = memoryMXBean; + this.threadMXBean = threadMXBean; + this.runtimeMXBean = runtimeMXBean; + } + + public Map<MetricName, Gauge<?>> getJvmMetrics() { + return Stream.concat( + getMemoryUsageMetrics().stream(), + Stream.concat( + getThreadMetrics().stream(), + Stream.of(getRuntimeMetrics()) + )) + .collect(Collectors.toMap(JvmMetric::getMetricName, JvmMetric::getMetric)); + } + + private List<JvmMetric> getMemoryUsageMetrics() { + + return Stream.of( + new AbstractMap.SimpleEntry<>("heap_usage", memoryMXBean.getHeapMemoryUsage()), + new AbstractMap.SimpleEntry<>("non_heap_usage", memoryMXBean.getNonHeapMemoryUsage())) + .map(entry -> + new JvmMetric( + MetricNameBuilder.builder().type(MEMORY).name(entry.getKey()).build(), + new RatioGauge() { + + @Override + protected double getNumerator() { + return entry.getValue().getUsed(); + } + + @Override + protected double getDenominator() { + return entry.getValue().getMax(); + } + } + )) + .collect(Collectors.toList()); + + } + + private List<JvmMetric> getThreadMetrics() { + + return + Stream.concat( + Stream.of( + new JvmMetric( + MetricNameBuilder.builder().type(THREADS).name("thread_count").build(), + new Gauge<Integer>() { + @Override + public Integer value() { + return threadMXBean.getThreadCount(); + } + } + ), + new JvmMetric( + MetricNameBuilder.builder().type(THREADS).name("daemon_thread_count").build(), + new Gauge<Integer>() { + @Override + public Integer value() { + return threadMXBean.getDaemonThreadCount(); + } + } + )), + Stream + .of(Thread.State.RUNNABLE, Thread.State.BLOCKED, Thread.State.TIMED_WAITING, Thread.State.TERMINATED) + .map(state -> new JvmMetric( + MetricNameBuilder.builder().type(THREADS).name(getThreadMetricNameByState(state)).build(), + new Gauge<Long>() { + @Override + public Long value() { + return getThreadCountByState(state); + } + } + ))) + .collect(Collectors.toList()); + } + + private String getThreadMetricNameByState(@Nonnull Thread.State state) { + return String.format("thread-states.%s", state.name().toLowerCase()); + } + + private long getThreadCountByState(@Nonnull Thread.State state) { + return Arrays.stream(threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 0)) + .filter(threadInfo -> threadInfo.getThreadState().equals(state)) + .count(); + } + + private JvmMetric getRuntimeMetrics() { + return new JvmMetric( + MetricNameBuilder.builder().type(RUNTIME).name("uptime").build(), + new Gauge<Long>() { + @Override + public Long value() { + return runtimeMXBean.getUptime(); + } + } + ); + } + +} + diff --git a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java index 782b8d2..52f1c70 100644 --- a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java +++ b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java @@ -18,6 +18,24 @@ package org.apache.hadoop.metrics2.sink.kafka; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.ClassUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; import com.yammer.metrics.Metrics; import com.yammer.metrics.core.Counter; import com.yammer.metrics.core.Gauge; @@ -33,25 +51,6 @@ import com.yammer.metrics.stats.Snapshot; import kafka.metrics.KafkaMetricsConfig; import kafka.metrics.KafkaMetricsReporter; import kafka.utils.VerifiableProperties; -import org.apache.commons.lang.ArrayUtils; -import org.apache.commons.lang.ClassUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.TimeUnit; import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata.MetricType; import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS; import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT; @@ -315,6 +314,10 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink protected TimelineScheduledReporter(MetricsRegistry registry, String name, TimeUnit rateUnit, TimeUnit durationUnit) { super(registry, name, rateUnit, durationUnit); + + JvmMetricSet.getInstance() + .getJvmMetrics() + .forEach(registry::newGauge); } @Override @@ -521,4 +524,4 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink } } -} +} \ No newline at end of file diff --git a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/MetricNameBuilder.java b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/MetricNameBuilder.java new file mode 100644 index 0000000..3507d2d --- /dev/null +++ b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/MetricNameBuilder.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.kafka; + +import java.util.regex.Pattern; + +import com.yammer.metrics.core.MetricName; + +import static java.util.Optional.ofNullable; + +public class MetricNameBuilder { + private static final Pattern WHITESPACE = Pattern.compile("[\\s]+"); + + private String group = "jvm"; + private String type; + private String name; + + private final String replacement; + + static MetricNameBuilder builder() { + return new MetricNameBuilder(); + } + + MetricNameBuilder() { + this(null); + } + + MetricNameBuilder(String replacement) { + this.replacement = ofNullable(replacement).orElse("_"); + } + + public MetricNameBuilder group(String group) { + this.group = group; + return this; + } + + public MetricNameBuilder type(String type) { + this.type = replaceWhiteSpaces(type); + return this; + } + + public MetricNameBuilder name(String name) { + this.name = ofNullable(this.name).orElse("") + replaceWhiteSpaces(name); + return this; + } + + private String replaceWhiteSpaces(String value) { + return ofNullable(value) + .map(val -> WHITESPACE.matcher(val).replaceAll(replacement)) + .orElse(""); + } + + public MetricName build() { + return new MetricName(this.group, type, name, null, createMBeanName()); + } + + private String createMBeanName() { + final StringBuilder builder = new StringBuilder(); + builder.append(group); + builder.append(":type="); + builder.append(type); + if (name.length() > 0) { + builder.append(",name="); + builder.append(name); + } + return builder.toString(); + } + +} \ No newline at end of file diff --git a/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/JvmMetricSetTest.java b/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/JvmMetricSetTest.java new file mode 100644 index 0000000..f4c242b --- /dev/null +++ b/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/JvmMetricSetTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.kafka; + +import java.util.Map; +import org.junit.Test; +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.MetricName; +import static java.util.stream.Collectors.toList; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.matchers.JUnitMatchers.hasItems; + +public class JvmMetricSetTest { + + @Test + public void testGetJvmMetrics() { + + Map<MetricName, Gauge<?>> result = JvmMetricSet.getInstance().getJvmMetrics(); + + assertNotNull(result); + assertFalse(result.isEmpty()); + assertThat( + result.keySet() + .stream() + .map(MetricName::getName) + .collect(toList()), + hasItems("heap_usage", "thread-states.blocked", "thread-states.timed_waiting", "uptime")); + } + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@ambari.apache.org For additional commands, e-mail: commits-h...@ambari.apache.org