Repository: ambari Updated Branches: refs/heads/trunk 211a48d56 -> f3659cce6
Capture HDFS metrics per RPC port number in AMS and Grafana. (swagle) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f3659cce Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f3659cce Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f3659cce Branch: refs/heads/trunk Commit: f3659cce6946895bc8fdda0bfb1232643d49223a Parents: 211a48d Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Fri Mar 4 11:57:22 2016 -0800 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Fri Mar 4 11:57:28 2016 -0800 ---------------------------------------------------------------------- .../timeline/AbstractTimelineMetricsSink.java | 6 +- .../timeline/HadoopTimelineMetricsSink.java | 112 +++++++---- .../timeline/HadoopTimelineMetricsSinkTest.java | 184 +++++++++++++++++-- .../server/api/services/AmbariMetaInfo.java | 96 ++++++---- .../metrics/timeline/AMSPropertyProvider.java | 27 ++- .../controller/utilities/PropertyHelper.java | 77 ++++++++ .../server/upgrade/UpgradeCatalog222.java | 90 +++++++++ .../common-services/HDFS/2.1.0.2.0/widgets.json | 58 ++++-- .../2.0.6/hooks/before-START/scripts/params.py | 57 ++++++ .../templates/hadoop-metrics2.properties.j2 | 11 ++ .../StackArtifactResourceProviderTest.java | 50 ++++- .../utilities/PropertyHelperTest.java | 14 ++ .../server/upgrade/UpgradeCatalog222Test.java | 89 +++++++-- 13 files changed, 756 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java index b2810b7..28d3b9c 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java @@ -47,6 +47,9 @@ public abstract class AbstractTimelineMetricsSink { public static final String COLLECTOR_PROPERTY = "collector"; public static final int DEFAULT_POST_TIMEOUT_SECONDS = 10; public static final String SKIP_COUNTER_TRANSFROMATION = "skipCounterDerivative"; + public static final String RPC_METRIC_PREFIX = "metric.rpc"; + public static final String RPC_METRIC_NAME_SUFFIX = "suffix"; + public static final String RPC_METRIC_PORT_SUFFIX = "port"; public static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics"; @@ -78,8 +81,7 @@ public abstract class AbstractTimelineMetricsSink { HttpURLConnection connection = null; try { if (connectUrl == null) { - throw new IOException("Unknown URL. " + - "Unable to connect to metrics collector."); + throw new IOException("Unknown URL. Unable to connect to metrics collector."); } String jsonData = mapper.writeValueAsString(metrics); connection = connectUrl.startsWith("https") ? http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java index 6da9257..db8791f 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java @@ -17,31 +17,31 @@ */ package org.apache.hadoop.metrics2.sink.timeline; -import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.apache.commons.configuration.SubsetConfiguration; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricType; import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.hadoop.metrics2.MetricsSink; import org.apache.hadoop.metrics2.MetricsTag; -import org.apache.hadoop.metrics2.MetricType; import org.apache.hadoop.metrics2.impl.MsInfo; import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; import org.apache.hadoop.metrics2.util.Servers; import org.apache.hadoop.net.DNS; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + @InterfaceAudience.Public @InterfaceStability.Evolving public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink implements MetricsSink { @@ -54,9 +54,13 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple private static final String SERVICE_NAME_PREFIX = "serviceName-prefix"; private static final String SERVICE_NAME = "serviceName"; private int timeoutSeconds = 10; + private SubsetConfiguration conf; + // Cache the rpc port used and the suffix to use if the port tag is found + private Map<String, String> rpcPortSuffixes = new HashMap<>(10); @Override public void init(SubsetConfiguration conf) { + this.conf = conf; LOG.info("Initializing Timeline metrics sink."); // Take the hostname from the DNS class. @@ -83,8 +87,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple if (metricsServers == null || metricsServers.isEmpty()) { LOG.error("No Metric collector configured."); } else { - collectorUri = conf.getString(COLLECTOR_PROPERTY).trim() - + WS_V1_TIMELINE_METRICS; + collectorUri = conf.getString(COLLECTOR_PROPERTY).trim() + WS_V1_TIMELINE_METRICS; if (collectorUri.toLowerCase().startsWith("https://")) { String trustStorePath = conf.getString(SSL_KEYSTORE_PATH_PROPERTY).trim(); String trustStoreType = conf.getString(SSL_KEYSTORE_TYPE_PROPERTY).trim(); @@ -109,27 +112,40 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple Iterator<String> it = (Iterator<String>) conf.getKeys(); while (it.hasNext()) { String propertyName = it.next(); - if (propertyName != null && propertyName.startsWith(TAGS_FOR_PREFIX_PROPERTY_PREFIX)) { - String contextName = propertyName.substring(TAGS_FOR_PREFIX_PROPERTY_PREFIX.length()); - String[] tags = conf.getStringArray(propertyName); - boolean useAllTags = false; - Set<String> set = null; - if (tags.length > 0) { - set = new HashSet<String>(); - for (String tag : tags) { - tag = tag.trim(); - useAllTags |= tag.equals("*"); - if (tag.length() > 0) { - set.add(tag); + if (propertyName != null) { + if (propertyName.startsWith(TAGS_FOR_PREFIX_PROPERTY_PREFIX)) { + String contextName = propertyName.substring(TAGS_FOR_PREFIX_PROPERTY_PREFIX.length()); + String[] tags = conf.getStringArray(propertyName); + boolean useAllTags = false; + Set<String> set = null; + if (tags.length > 0) { + set = new HashSet<String>(); + for (String tag : tags) { + tag = tag.trim(); + useAllTags |= tag.equals("*"); + if (tag.length() > 0) { + set.add(tag); + } + } + if (useAllTags) { + set = null; } } - if (useAllTags) { - set = null; - } + useTagsMap.put(contextName, set); + } + // Customized RPC ports + if (propertyName.startsWith(RPC_METRIC_PREFIX)) { + // metric.rpc.client.port + int beginIdx = RPC_METRIC_PREFIX.length() + 1; + String suffixStr = propertyName.substring(beginIdx); // client.port + String configPrefix = suffixStr.substring(0, suffixStr.indexOf(".")); // client + rpcPortSuffixes.put(conf.getString(propertyName).trim(), configPrefix.trim()); } - useTagsMap.put(contextName, set); } } + if (!rpcPortSuffixes.isEmpty()) { + LOG.info("RPC port properties configured: " + rpcPortSuffixes); + } } /** @@ -172,14 +188,41 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple StringBuilder sb = new StringBuilder(); sb.append(contextName); sb.append('.'); - sb.append(recordName); + // Similar to GangliaContext adding processName to distinguish jvm + // metrics for co-hosted daemons. We only do this for HBase since the + // appId is shared for Master and RS. + if (contextName.equals("jvm")) { + if (record.tags() != null) { + for (MetricsTag tag : record.tags()) { + if (tag.info().name().equalsIgnoreCase("processName") && + (tag.value().equals("RegionServer") || tag.value().equals("Master"))) { + sb.append(tag.value()); + sb.append('.'); + } + } + } + } + sb.append(recordName); appendPrefix(record, sb); - sb.append("."); + sb.append('.'); + + // Add port tag for rpc metrics to distinguish rpc calls based on port + if (!rpcPortSuffixes.isEmpty() && contextName.contains("rpc")) { + if (record.tags() != null) { + for (MetricsTag tag : record.tags()) { + if (tag.info().name().equalsIgnoreCase("port") && + rpcPortSuffixes.keySet().contains(tag.value())) { + sb.append(rpcPortSuffixes.get(tag.value())); + sb.append('.'); + } + } + } + } + int sbBaseLen = sb.length(); - Collection<AbstractMetric> metrics = - (Collection<AbstractMetric>) record.metrics(); + Collection<AbstractMetric> metrics = (Collection<AbstractMetric>) record.metrics(); List<TimelineMetric> metricList = new ArrayList<TimelineMetric>(); long startTime = record.timestamp(); @@ -247,4 +290,5 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple public void flush() { // TODO: Buffering implementation } + } http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java index 528384e..4a5abcc 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java @@ -18,6 +18,34 @@ package org.apache.hadoop.metrics2.sink.timeline; +import org.apache.commons.configuration.SubsetConfiguration; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricType; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.OutputStream; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PROPERTY; import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.MAX_METRIC_ROW_CACHE_SIZE; import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.METRICS_SEND_INTERVAL; @@ -31,28 +59,14 @@ import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; -import java.io.OutputStream; -import java.net.URL; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; - -import org.apache.commons.configuration.SubsetConfiguration; -import org.apache.hadoop.metrics2.AbstractMetric; -import org.apache.hadoop.metrics2.MetricType; -import org.apache.hadoop.metrics2.MetricsRecord; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - @RunWith(PowerMockRunner.class) public class HadoopTimelineMetricsSinkTest { + @Before + public void setup() { + Logger.getLogger("org.apache.hadoop.metrics2.sink.timeline").setLevel(Level.DEBUG); + } + @Test @PrepareForTest({URL.class, OutputStream.class}) public void testPutMetrics() throws Exception { @@ -241,5 +255,137 @@ public class HadoopTimelineMetricsSinkTest { Assert.assertEquals(new Double(6.0), values.next()); } + @Test + public void testRPCPortSuffixHandledCorrectly() throws Exception { + HadoopTimelineMetricsSink sink = + createMockBuilder(HadoopTimelineMetricsSink.class) + .withConstructor().addMockedMethod("appendPrefix") + .addMockedMethod("emitMetrics").createNiceMock(); + + SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class); + expect(conf.getString(eq("slave.host.name"))).andReturn("testhost").anyTimes(); + expect(conf.getParent()).andReturn(null).anyTimes(); + expect(conf.getPrefix()).andReturn("service").anyTimes(); + expect(conf.getString(eq(COLLECTOR_PROPERTY))).andReturn("localhost:63188").anyTimes(); + expect(conf.getString(eq("serviceName-prefix"), eq(""))).andReturn("").anyTimes(); + + expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes(); + expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes(); + + conf.setListDelimiter(eq(',')); + expectLastCall().anyTimes(); + + Set<String> rpcPortSuffixes = new HashSet<String>() {{ + add("metric.rpc.client.port"); + add("metric.rpc.datanode.port"); + add("metric.rpc.healthcheck.port"); + }}; + + expect(conf.getKeys()).andReturn(rpcPortSuffixes.iterator()); + expect(conf.getString("metric.rpc.client.port")).andReturn("8020"); + expect(conf.getString("metric.rpc.datanode.port")).andReturn("8040"); + expect(conf.getString("metric.rpc.healthcheck.port")).andReturn("8060"); + AbstractMetric metric = createNiceMock(AbstractMetric.class); + expect(metric.name()).andReturn("rpc.metricName").anyTimes(); + expect(metric.value()).andReturn(1.0).once(); + expect(metric.value()).andReturn(2.0).once(); + expect(metric.value()).andReturn(3.0).once(); + expect(metric.value()).andReturn(4.0).once(); + expect(metric.value()).andReturn(5.0).once(); + expect(metric.value()).andReturn(6.0).once(); + + MetricsRecord record = createNiceMock(MetricsRecord.class); + expect(record.name()).andReturn("testMetric").anyTimes(); + expect(record.context()).andReturn("rpc").anyTimes(); + Collection<MetricsTag> tags1 = Collections.singletonList( + new MetricsTag(new MetricsInfo() { + @Override + public String name() { + return "port"; + } + + @Override + public String description() { + return null; + } + }, "8020") + ); + Collection<MetricsTag> tags2 = Collections.singletonList( + new MetricsTag(new MetricsInfo() { + @Override + public String name() { + return "port"; + } + + @Override + public String description() { + return null; + } + }, "8040") + ); + expect(record.tags()).andReturn(tags1).times(6); + expect(record.tags()).andReturn(tags2).times(6); + + sink.appendPrefix(eq(record), (StringBuilder) anyObject()); + expectLastCall().anyTimes().andStubAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + return null; + } + }); + + final Long now = System.currentTimeMillis(); + // TODO: Current implementation of cache needs > 1 elements to evict any + expect(record.timestamp()).andReturn(now).times(2); + expect(record.timestamp()).andReturn(now + 100l).times(2); + expect(record.timestamp()).andReturn(now + 200l).once(); + expect(record.timestamp()).andReturn(now + 300l).once(); + + expect(record.metrics()).andReturn(Arrays.asList(metric)).anyTimes(); + + final List<TimelineMetrics> capturedMetrics = new ArrayList<TimelineMetrics>(); + sink.emitMetrics((TimelineMetrics) anyObject()); + expectLastCall().andStubAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + capturedMetrics.add((TimelineMetrics) EasyMock.getCurrentArguments()[0]); + return null; + } + }); + + replay(conf, sink, record, metric); + + sink.init(conf); + + // time = t1 + sink.putMetrics(record); + // time = t1 + sink.putMetrics(record); + // time = t2 + sink.putMetrics(record); + // Evict + // time = t2 + sink.putMetrics(record); + // time = t3 + sink.putMetrics(record); + // time = t4 + sink.putMetrics(record); + + verify(conf, sink, record, metric); + + Assert.assertEquals(2, capturedMetrics.size()); + Iterator<TimelineMetrics> metricsIterator = capturedMetrics.iterator(); + + // t1, t2 + TimelineMetric timelineMetric1 = metricsIterator.next().getMetrics().get(0); + Assert.assertEquals(2, timelineMetric1.getMetricValues().size()); + // Assert the tag added to the name + Assert.assertEquals("rpc.testMetric.client.rpc.metricName", timelineMetric1.getMetricName()); + // t3, t4 + TimelineMetric timelineMetric2 = metricsIterator.next().getMetrics().get(0); + Assert.assertEquals(2, timelineMetric2.getMetricValues().size()); + // Assert the tag added to the name + Assert.assertEquals("rpc.testMetric.datanode.rpc.metricName", timelineMetric2.getMetricName()); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java index 81aced4..2b863d5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java @@ -27,6 +27,7 @@ import org.apache.ambari.server.ParentObjectNotFoundException; import org.apache.ambari.server.StackAccessException; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.RootServiceResponseFactory.Services; +import org.apache.ambari.server.controller.utilities.PropertyHelper; import org.apache.ambari.server.customactions.ActionDefinition; import org.apache.ambari.server.customactions.ActionDefinitionManager; import org.apache.ambari.server.events.AlertDefinitionDisabledEvent; @@ -335,7 +336,7 @@ public class AmbariMetaInfo { DependencyInfo foundDependency = null; List<DependencyInfo> componentDependencies = getComponentDependencies( - stackName, version, service, component); + stackName, version, service, component); Iterator<DependencyInfo> iter = componentDependencies.iterator(); while (foundDependency == null && iter.hasNext()) { DependencyInfo dependency = iter.next(); @@ -363,7 +364,7 @@ public class AmbariMetaInfo { for (RepositoryInfo repo : repository) { if (!reposResult.containsKey(repo.getOsType())) { reposResult.put(repo.getOsType(), - new ArrayList<RepositoryInfo>()); + new ArrayList<RepositoryInfo>()); } reposResult.get(repo.getOsType()).add(repo); } @@ -879,7 +880,7 @@ public class AmbariMetaInfo { try { map = gson.fromJson(new FileReader(svc.getMetricsFile()), type); - svc.setMetrics(updateComponentMetricMapWithAggregateFunctionIds(map)); + svc.setMetrics(processMetricDefinition(map)); } catch (Exception e) { LOG.error ("Could not read the metrics file", e); @@ -892,43 +893,52 @@ public class AmbariMetaInfo { /** * Add aggregate function support for all stack defined metrics. + * + * Refactor Namenode RPC metrics for different kinds of ports. */ - private Map<String, Map<String, List<MetricDefinition>>> updateComponentMetricMapWithAggregateFunctionIds( - Map<String, Map<String, List<MetricDefinition>>> metricMap) { + private Map<String, Map<String, List<MetricDefinition>>> processMetricDefinition( + Map<String, Map<String, List<MetricDefinition>>> metricMap) { if (!metricMap.isEmpty()) { // For every Component - for (Map<String, List<MetricDefinition>> componentMetricDef : metricMap.values()) { + for (Map.Entry<String, Map<String, List<MetricDefinition>>> componentMetricDefEntry : metricMap.entrySet()) { + String componentName = componentMetricDefEntry.getKey(); // For every Component / HostComponent category - for (Map.Entry<String, List<MetricDefinition>> metricDefEntry : componentMetricDef.entrySet()) { - // NOTE: Only Component aggregates supported for now. - if (metricDefEntry.getKey().equals(Component.name())) { - //For every metric definition - for (MetricDefinition metricDefinition : metricDefEntry.getValue()) { - // Metrics System metrics only - if (metricDefinition.getType().equals("ganglia")) { - // Create a new map for each category - for (Map<String, Metric> metricByCategory : metricDefinition.getMetricsByCategory().values()) { - Map<String, Metric> newMetrics = new HashMap<String, Metric>(); - - // For every function id - for (String identifierToAdd : AGGREGATE_FUNCTION_IDENTIFIERS) { - - for (Map.Entry<String, Metric> metricEntry : metricByCategory.entrySet()) { - String newMetricKey = metricEntry.getKey() + identifierToAdd; - Metric currentMetric = metricEntry.getValue(); - Metric newMetric = new Metric( - currentMetric.getName() + identifierToAdd, - currentMetric.isPointInTime(), - currentMetric.isTemporal(), - currentMetric.isAmsHostMetric(), - currentMetric.getUnit() - ); - newMetrics.put(newMetricKey, newMetric); + for (Map.Entry<String, List<MetricDefinition>> metricDefEntry : componentMetricDefEntry.getValue().entrySet()) { + //For every metric definition + for (MetricDefinition metricDefinition : metricDefEntry.getValue()) { + // Metrics System metrics only + if (metricDefinition.getType().equals("ganglia")) { + for (Map.Entry<String, Map<String, Metric>> metricByCategory : metricDefinition.getMetricsByCategory().entrySet()) { + String category = metricByCategory.getKey(); + Iterator<Map.Entry<String, Metric>> iterator = metricByCategory.getValue().entrySet().iterator(); + Map<String, Metric> newMetricsToAdd = new HashMap<>(); + + while (iterator.hasNext()) { + Map.Entry<String, Metric> metricEntry = iterator.next(); + // Process Namenode rpc metrics + Map<String, Metric> replacementMetrics = PropertyHelper.processRpcMetricDefinition( + componentName, metricEntry.getKey(), metricEntry.getValue()); + if (replacementMetrics != null) { + iterator.remove(); // Remove current metric entry + newMetricsToAdd.putAll(replacementMetrics); + // Add aggregate functions for replacement metrics + if (metricDefEntry.getKey().equals(Component.name())) { + for (Map.Entry<String, Metric> replacementMetric : replacementMetrics.entrySet()) { + newMetricsToAdd.putAll(getAggregateFunctionMetrics(replacementMetric.getKey(), + replacementMetric.getValue())); + } + } + } else { + // NOTE: Only Component aggregates supported for now. + if (metricDefEntry.getKey().equals(Component.name())) { + Map<String, Metric> aggregateFunctionMetrics = + getAggregateFunctionMetrics(metricEntry.getKey(), metricEntry.getValue()); + newMetricsToAdd.putAll(aggregateFunctionMetrics); } } - metricByCategory.putAll(newMetrics); } + metricByCategory.getValue().putAll(newMetricsToAdd); } } } @@ -939,6 +949,24 @@ public class AmbariMetaInfo { return metricMap; } + private Map<String, Metric> getAggregateFunctionMetrics(String metricName, Metric currentMetric) { + Map<String, Metric> newMetrics = new HashMap<String, Metric>(); + // For every function id + for (String identifierToAdd : AGGREGATE_FUNCTION_IDENTIFIERS) { + String newMetricKey = metricName + identifierToAdd; + Metric newMetric = new Metric( + currentMetric.getName() + identifierToAdd, + currentMetric.isPointInTime(), + currentMetric.isTemporal(), + currentMetric.isAmsHostMetric(), + currentMetric.getUnit() + ); + newMetrics.put(newMetricKey, newMetric); + } + + return newMetrics; + } + /** * Gets the metrics for a Role (component). * @return the list of defined metrics. @@ -947,8 +975,8 @@ public class AmbariMetaInfo { String serviceName, String componentName, String metricType) throws AmbariException { - Map<String, Map<String, List<MetricDefinition>>> map = - getServiceMetrics(stackName, stackVersion, serviceName); + Map<String, Map<String, List<MetricDefinition>>> map = getServiceMetrics( + stackName, stackVersion, serviceName); if (map != null && map.containsKey(componentName)) { if (map.get(componentName).containsKey(metricType)) { http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java index a674371..4bc9fd7 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java @@ -69,6 +69,13 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { private static AtomicInteger printSkipPopulateMsgHostCompCounter = new AtomicInteger(0); private static final Map<String, String> timelineAppIdCache = new ConcurrentHashMap<>(10); + private static final Map<String, String> JVM_PROCESS_NAMES = new HashMap<>(2); + + static { + JVM_PROCESS_NAMES.put("HBASE_MASTER", "Master."); + JVM_PROCESS_NAMES.put("HBASE_REGIONSERVER", "RegionServer."); + } + public AMSPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap, URLStreamProvider streamProvider, ComponentSSLConfiguration configuration, @@ -411,6 +418,7 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { if (metricsMap != null) { for (String propertyId : propertyIdSet) { if (propertyId != null) { +// propertyId = postProcessPropertyId(propertyId, getComponentName(resource)); if (metricsMap.containsKey(propertyId)){ if (containsArguments(propertyId)) { int i = 1; @@ -630,7 +638,9 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { requests.put(temporalInfo, metricsRequest); } metricsRequest.putResource(getComponentName(resource), resource); - metricsRequest.putPropertyId(propertyInfo.getPropertyId(), propertyId); + metricsRequest.putPropertyId( + preprocessPropertyId(propertyInfo.getPropertyId(), getComponentName(resource)), + propertyId); // If request is for a host metric we need to create multiple requests if (propertyInfo.isAmsHostMetric()) { metricsRequest.putHosComponentHostMetric(propertyInfo.getPropertyId()); @@ -643,6 +653,21 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { return requestMap; } + /** + * Account for the processName added to the jvm metrics by the HadoopSink. + * E.g.: jvm.RegionServer.JvmMetrics.GcTimeMillis + * + */ + private String preprocessPropertyId(String propertyId, String componentName) { + if (propertyId.startsWith("jvm") && JVM_PROCESS_NAMES.keySet().contains(componentName)) { + String newPropertyId = propertyId.replace("jvm.", "jvm." + JVM_PROCESS_NAMES.get(componentName)); + LOG.debug("Pre-process: " + propertyId + ", to: " + newPropertyId); + return newPropertyId; + } + + return propertyId; + } + static URIBuilder getAMSUriBuilder(String hostname, int port, boolean httpsEnabled) { URIBuilder uriBuilder = new URIBuilder(); uriBuilder.setScheme(httpsEnabled ? "https" : "http"); http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java index cefe953..41da279 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/utilities/PropertyHelper.java @@ -36,6 +36,7 @@ import org.apache.ambari.server.controller.spi.Request; import org.apache.ambari.server.controller.spi.Resource; import org.apache.ambari.server.controller.spi.SortRequest; import org.apache.ambari.server.controller.spi.TemporalInfo; +import org.apache.ambari.server.state.stack.Metric; import org.apache.commons.lang.StringUtils; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; @@ -67,6 +68,9 @@ public class PropertyHelper { private static final Map<Resource.InternalType, Map<String, Map<String, PropertyInfo>>> SQLSERVER_PROPERTY_IDS = readPropertyProviderIds(SQLSERVER_PROPERTIES_FILE); private static final Map<Resource.InternalType, Map<Resource.Type, String>> KEY_PROPERTY_IDS = readKeyPropertyIds(KEY_PROPERTIES_FILE); + // Suffixes to add for Namenode rpc metrics prefixes + private static final Map<String, List<String>> RPC_METRIC_SUFFIXES = new HashMap<>(); + /** * Regular expression to check for replacement arguments (e.g. $1) in a property id. */ @@ -92,6 +96,11 @@ public class PropertyHelper { */ private static final Pattern METRIC_CATEGORY_TOKENIZE_REGEX = Pattern.compile("/+(?=([^\"\\\\\\\\]*(\\\\\\\\.|\"([^\"\\\\\\\\]*\\\\\\\\.)*[^\"\\\\\\\\]*\"))*[^\"]*$)"); + static { + RPC_METRIC_SUFFIXES.put("rpc.rpc", Arrays.asList("client", "datanode", "healthcheck")); + RPC_METRIC_SUFFIXES.put("rpcdetailed.rpcdetailed", Arrays.asList("client", "datanode", "healthcheck")); + } + public static String getPropertyId(String category, String name) { String propertyId = (category == null || category.isEmpty())? name : (name == null || name.isEmpty()) ? category : category + EXTERNAL_PATH_SEP + name; @@ -626,4 +635,72 @@ public class PropertyHelper { } return false; } + + + /** + * Special handle rpc port tags added to metric names for HDFS Namenode + * + * Returns the replacement definitions + */ + public static Map<String, org.apache.ambari.server.state.stack.Metric> processRpcMetricDefinition( + String componentName, String propertyId, org.apache.ambari.server.state.stack.Metric metric) { + Map<String, org.apache.ambari.server.state.stack.Metric> replacementMap = null; + if (componentName.equalsIgnoreCase("NAMENODE")) { + for (Map.Entry<String, List<String>> entry : RPC_METRIC_SUFFIXES.entrySet()) { + String prefix = entry.getKey(); + if (metric.getName().startsWith(prefix)) { + replacementMap = new HashMap<>(); + for (String suffix : entry.getValue()) { + org.apache.ambari.server.state.stack.Metric newMetric = new org.apache.ambari.server.state.stack.Metric( + insertTagInToMetricName(suffix, metric.getName(), prefix), + metric.isPointInTime(), + metric.isTemporal(), + metric.isAmsHostMetric(), + metric.getUnit() + ); + + replacementMap.put(insertTagInToMetricName(suffix, propertyId, prefix), newMetric); + } + } + } + } + return replacementMap; + } + + /** + * Returns tag inserted metric name after the prefix. + * @param tag E.g.: client + * @param metricName : rpc.rpc.CallQueueLength Or metrics/rpc/CallQueueLen + * @param prefix : rpc.rpc + * @return rpc.rpc.client.CallQueueLength Or metrics/rpc/client/CallQueueLen + */ + static String insertTagInToMetricName(String tag, String metricName, String prefix) { + String sepExpr = "\\."; + String seperator = "."; + if (metricName.indexOf(EXTERNAL_PATH_SEP) != -1) { + sepExpr = Character.toString(EXTERNAL_PATH_SEP); + seperator = sepExpr; + } + String prefixSep = prefix.contains(".") ? "\\." : "" + EXTERNAL_PATH_SEP; + + // Remove separator if any + if (prefix.substring(prefix.length() - 1).equals(prefixSep)) { + prefix = prefix.substring(0, prefix.length() - 1); + } + int pos = prefix.split(prefixSep).length - 1; + String[] parts = metricName.split(sepExpr); + StringBuilder sb = new StringBuilder(); + + for (int i = 0; i < parts.length; i++) { + sb.append(parts[i]); + if (i < parts.length - 1) { + sb.append(seperator); + } + if (i == pos) { // append the tag + sb.append(tag); + sb.append(seperator); + } + } + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog222.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog222.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog222.java index 8ea455d..78cf8bd 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog222.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog222.java @@ -18,25 +18,38 @@ package org.apache.ambari.server.upgrade; +import com.google.gson.Gson; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import com.google.gson.reflect.TypeToken; import com.google.inject.Inject; import com.google.inject.Injector; import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.controller.AmbariManagementController; import org.apache.ambari.server.orm.dao.AlertDefinitionDAO; import org.apache.ambari.server.orm.dao.DaoUtils; +import org.apache.ambari.server.orm.dao.WidgetDAO; import org.apache.ambari.server.orm.entities.AlertDefinitionEntity; +import org.apache.ambari.server.orm.entities.WidgetEntity; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Config; import org.apache.ambari.server.state.ServiceComponentHost; +import org.apache.ambari.server.state.ServiceInfo; import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.state.StackInfo; +import org.apache.ambari.server.state.stack.WidgetLayout; +import org.apache.ambari.server.state.stack.WidgetLayoutInfo; import org.apache.ambari.server.utils.VersionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileReader; +import java.lang.reflect.Type; import java.sql.SQLException; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -80,6 +93,9 @@ public class UpgradeCatalog222 extends AbstractUpgradeCatalog { public static final String CLUSTER_HOUR_TABLE_TTL = "timeline.metrics.cluster.aggregator.hourly.ttl"; public static final String CLUSTER_DAILY_TABLE_TTL = "timeline.metrics.cluster.aggregator.daily.ttl"; + private static final String[] HDFS_WIDGETS_TO_UPDATE = new String[] { + "NameNode RPC", "NN Connection Load" }; + // ----- Constructors ------------------------------------------------------ @@ -133,6 +149,7 @@ public class UpgradeCatalog222 extends AbstractUpgradeCatalog { updateAMSConfigs(); updateHiveConfig(); updateHostRoleCommands(); + updateHDFSWidgetDefinition(); } protected void updateStormConfigs() throws AmbariException { @@ -336,6 +353,79 @@ public class UpgradeCatalog222 extends AbstractUpgradeCatalog { } } + protected void updateHDFSWidgetDefinition() throws AmbariException { + LOG.info("Updating HDFS widget definition."); + AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class); + AmbariMetaInfo ambariMetaInfo = injector.getInstance(AmbariMetaInfo.class); + Type widgetLayoutType = new TypeToken<Map<String, List<WidgetLayout>>>(){}.getType(); + Gson gson = injector.getInstance(Gson.class); + WidgetDAO widgetDAO = injector.getInstance(WidgetDAO.class); + + Clusters clusters = ambariManagementController.getClusters(); + + Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters); + for (final Cluster cluster : clusterMap.values()) { + long clusterID = cluster.getClusterId(); + + for (String widgetName : HDFS_WIDGETS_TO_UPDATE) { + List<WidgetEntity> widgetEntities = widgetDAO.findByName(clusterID, + widgetName, "ambari", "HDFS_SUMMARY"); + + if (widgetEntities != null) { + WidgetEntity entityToUpdate = null; + if (widgetEntities.size() > 1) { + LOG.info("Found more that 1 entity with name = "+ widgetName + + " for cluster = " + cluster.getClusterName() + ", skipping update."); + } else { + entityToUpdate = widgetEntities.iterator().next(); + } + if (entityToUpdate != null) { + LOG.info("Updating widget: " + entityToUpdate.getWidgetName()); + // Get the definition from widgets.json file + WidgetLayoutInfo targetWidgetLayoutInfo = null; + StackId stackId = cluster.getDesiredStackVersion(); + Map<String, Object> widgetDescriptor = null; + StackInfo stackInfo = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion()); + ServiceInfo serviceInfo = stackInfo.getService("HDFS"); + File widgetDescriptorFile = serviceInfo.getWidgetsDescriptorFile(); + if (widgetDescriptorFile != null && widgetDescriptorFile.exists()) { + try { + widgetDescriptor = gson.fromJson(new FileReader(widgetDescriptorFile), widgetLayoutType); + } catch (Exception ex) { + String msg = "Error loading widgets from file: " + widgetDescriptorFile; + LOG.error(msg, ex); + widgetDescriptor = null; + } + } + if (widgetDescriptor != null) { + LOG.debug("Loaded widget descriptor: " + widgetDescriptor); + for (Object artifact : widgetDescriptor.values()) { + List<WidgetLayout> widgetLayouts = (List<WidgetLayout>) artifact; + for (WidgetLayout widgetLayout : widgetLayouts) { + if (widgetLayout.getLayoutName().equals("default_hdfs_dashboard")) { + for (WidgetLayoutInfo layoutInfo : widgetLayout.getWidgetLayoutInfoList()) { + if (layoutInfo.getWidgetName().equals(widgetName)) { + targetWidgetLayoutInfo = layoutInfo; + } + } + } + } + } + } + if (targetWidgetLayoutInfo != null) { + entityToUpdate.setMetrics(gson.toJson(targetWidgetLayoutInfo.getMetricsInfo())); + entityToUpdate.setWidgetValues(gson.toJson(targetWidgetLayoutInfo.getValues())); + widgetDAO.merge(entityToUpdate); + } else { + LOG.warn("Unable to find widget layout info for " + widgetName + + " in the stack: " + stackId); + } + } + } + } + } + } + protected void updateHiveConfig() throws AmbariException { AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class); for (final Cluster cluster : getCheckedClusterMap(ambariManagementController.getClusters()).values()) { http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/widgets.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/widgets.json b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/widgets.json index 7e93a6e..89aab13 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/widgets.json +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/widgets.json @@ -74,8 +74,16 @@ "is_visible": true, "metrics": [ { - "name": "rpc.rpc.NumOpenConnections", - "metric_path": "metrics/rpc/NumOpenConnections", + "name": "rpc.rpc.client.NumOpenConnections", + "metric_path": "metrics/rpc/client/NumOpenConnections", + "category": "", + "service_name": "HDFS", + "component_name": "NAMENODE", + "host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active" + }, + { + "name": "rpc.rpc.datanode.NumOpenConnections", + "metric_path": "metrics/rpc/datanode/NumOpenConnections", "category": "", "service_name": "HDFS", "component_name": "NAMENODE", @@ -84,8 +92,12 @@ ], "values": [ { - "name": "Open Connections", - "value": "${rpc.rpc.NumOpenConnections}" + "name": "Open Client Connections", + "value": "${rpc.rpc.client.NumOpenConnections}" + }, + { + "name": "Open Datanode Connections", + "value": "${rpc.rpc.datanode.NumOpenConnections}" } ], "properties": { @@ -216,15 +228,29 @@ "is_visible": true, "metrics": [ { - "name": "rpc.rpc.RpcQueueTimeAvgTime", - "metric_path": "metrics/rpc/RpcQueueTime_avg_time", + "name": "rpc.rpc.client.RpcQueueTimeAvgTime", + "metric_path": "metrics/rpc/client/RpcQueueTime_avg_time", + "service_name": "HDFS", + "component_name": "NAMENODE", + "host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active" + }, + { + "name": "rpc.rpc.client.RpcProcessingTimeAvgTime", + "metric_path": "metrics/rpc/client/RpcProcessingTime_avg_time", "service_name": "HDFS", "component_name": "NAMENODE", "host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active" }, { - "name": "rpc.rpc.RpcProcessingTimeAvgTime", - "metric_path": "metrics/rpc/RpcProcessingTime_avg_time", + "name": "rpc.rpc.datanode.RpcQueueTimeAvgTime", + "metric_path": "metrics/rpc/datanode/RpcQueueTime_avg_time", + "service_name": "HDFS", + "component_name": "NAMENODE", + "host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active" + }, + { + "name": "rpc.rpc.datanode.RpcProcessingTimeAvgTime", + "metric_path": "metrics/rpc/datanode/RpcProcessingTime_avg_time", "service_name": "HDFS", "component_name": "NAMENODE", "host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active" @@ -232,12 +258,20 @@ ], "values": [ { - "name": "RPC Queue Wait time", - "value": "${rpc.rpc.RpcQueueTimeAvgTime}" + "name": "Client RPC Queue Wait time", + "value": "${rpc.rpc.client.RpcQueueTimeAvgTime}" + }, + { + "name": "Client RPC Processing time", + "value": "${rpc.rpc.client.RpcProcessingTimeAvgTime}" + }, + { + "name": "Datanode RPC Queue Wait time", + "value": "${rpc.rpc.datanode.RpcQueueTimeAvgTime}" }, { - "name": "RPC Processing time", - "value": "${rpc.rpc.RpcProcessingTimeAvgTime}" + "name": "Datanode RPC Processing time", + "value": "${rpc.rpc.datanode.RpcProcessingTimeAvgTime}" } ], "properties": { http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py index 4d129c7..069d1ae 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py @@ -236,6 +236,63 @@ smoke_user = config['configurations']['cluster-env']['smokeuser'] smoke_hdfs_user_dir = format("/user/{smoke_user}") smoke_hdfs_user_mode = 0770 + +##### Namenode RPC ports - metrics config section start ##### + +# Figure out the rpc ports for current namenode +nn_rpc_client_port = None +nn_rpc_dn_port = None +nn_rpc_healthcheck_port = None + +namenode_id = None +namenode_rpc = None + +dfs_ha_enabled = False +dfs_ha_nameservices = default("/configurations/hdfs-site/dfs.nameservices", None) +dfs_ha_namenode_ids = default(format("/configurations/hdfs-site/dfs.ha.namenodes.{dfs_ha_nameservices}"), None) + +dfs_ha_namemodes_ids_list = [] +other_namenode_id = None + +if dfs_ha_namenode_ids: + dfs_ha_namemodes_ids_list = dfs_ha_namenode_ids.split(",") + dfs_ha_namenode_ids_array_len = len(dfs_ha_namemodes_ids_list) + if dfs_ha_namenode_ids_array_len > 1: + dfs_ha_enabled = True + +if dfs_ha_enabled: + for nn_id in dfs_ha_namemodes_ids_list: + nn_host = config['configurations']['hdfs-site'][format('dfs.namenode.rpc-address.{dfs_ha_nameservices}.{nn_id}')] + if hostname in nn_host: + namenode_id = nn_id + namenode_rpc = nn_host + pass + pass +else: + namenode_rpc = default('/configurations/hdfs-site/dfs.namenode.rpc-address', None) + +if namenode_rpc: + nn_rpc_client_port = namenode_rpc.split(':')[1].strip() + +if dfs_ha_enabled: + dfs_service_rpc_address = default(format('/configurations/hdfs-site/dfs.namenode.servicerpc-address.{namenode_id}'), None) + dfs_lifeline_rpc_address = default(format('/configurations/hdfs-site/dfs.namenode.lifeline.rpc-address.{namenode_id}'), None) +else: + dfs_service_rpc_address = default('/configurations/hdfs-site/dfs.namenode.servicerpc-address', None) + dfs_lifeline_rpc_address = default(format('/configurations/hdfs-site/dfs.namenode.lifeline.rpc-address'), None) + +if dfs_service_rpc_address: + nn_rpc_dn_port = dfs_service_rpc_address.split(':')[1].strip() + +if dfs_lifeline_rpc_address: + nn_rpc_healthcheck_port = dfs_lifeline_rpc_address.split(':')[1].strip() + +is_nn_client_port_configured = False if nn_rpc_client_port is None else True +is_nn_dn_port_configured = False if nn_rpc_dn_port is None else True +is_nn_healthcheck_port_configured = False if nn_rpc_healthcheck_port is None else True + +##### end ##### + import functools #create partial functions with common arguments for every HdfsResource call #to create/delete/copyfromlocal hdfs directories/files we need to call params.HdfsResource in code http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2 b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2 index f9c2164..47b504f 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2 +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2 @@ -91,4 +91,15 @@ reducetask.sink.timeline.collector={{metric_collector_protocol}}://{{metric_coll resourcemanager.sink.timeline.tagsForPrefix.yarn=Queue +{% if is_nn_client_port_configured %} +# Namenode rpc ports customization +namenode.sink.timeline.metric.rpc.client.port={{nn_rpc_client_port}} +{% endif %} +{% if is_nn_dn_port_configured %} +namenode.sink.timeline.metric.rpc.datanode.port={{nn_rpc_dn_port}} +{% endif %} +{% if is_nn_healthcheck_port_configured %} +namenode.sink.timeline.metric.rpc.healthcheck.port={{nn_rpc_healthcheck_port}} +{% endif %} + {% endif %} http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackArtifactResourceProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackArtifactResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackArtifactResourceProviderTest.java index f4c212c..fda5e79 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackArtifactResourceProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackArtifactResourceProviderTest.java @@ -118,8 +118,7 @@ public class StackArtifactResourceProviderTest { Map<String, Object> descriptor = propertyMap.get(ARTIFACT_DATA_PROPERTY_ID + "/HDFS/DATANODE"); Assert.assertNotNull(descriptor); Assert.assertEquals(1, ((ArrayList) descriptor.get("Component")).size()); - MetricDefinition md = (MetricDefinition) ((ArrayList) descriptor.get - ("Component")).iterator().next(); + MetricDefinition md = (MetricDefinition) ((ArrayList) descriptor.get("Component")).iterator().next(); Metric m1 = md.getMetrics().get("metrics/dfs/datanode/heartBeats_avg_time"); Metric m2 = md.getMetrics().get("metrics/rpc/closeRegion_num_ops"); @@ -132,6 +131,53 @@ public class StackArtifactResourceProviderTest { } @Test + public void testGetMetricsDescriptorRpcForNamenode() throws Exception { + AmbariManagementController managementController = createNiceMock(AmbariManagementController.class); + + expect(managementController.getAmbariMetaInfo()).andReturn(metaInfo).anyTimes(); + + replay(managementController); + + StackArtifactResourceProvider resourceProvider = getStackArtifactResourceProvider(managementController); + + Set<String> propertyIds = new HashSet<String>(); + propertyIds.add(ARTIFACT_NAME_PROPERTY_ID); + propertyIds.add(STACK_NAME_PROPERTY_ID); + propertyIds.add(STACK_VERSION_PROPERTY_ID); + propertyIds.add(STACK_SERVICE_NAME_PROPERTY_ID); + propertyIds.add(ARTIFACT_DATA_PROPERTY_ID); + + Request request = PropertyHelper.getReadRequest(propertyIds); + + Predicate predicate = new PredicateBuilder().property + (ARTIFACT_NAME_PROPERTY_ID).equals("metrics_descriptor").and().property + (STACK_NAME_PROPERTY_ID).equals("OTHER").and().property + (STACK_VERSION_PROPERTY_ID).equals("1.0").and().property + (STACK_SERVICE_NAME_PROPERTY_ID).equals("HDFS").toPredicate(); + + Set<Resource> resources = resourceProvider.getResources(request, predicate); + + Assert.assertEquals(1, resources.size()); + Resource resource = resources.iterator().next(); + Map<String, Map<String, Object>> propertyMap = resource.getPropertiesMap(); + Map<String, Object> descriptor = propertyMap.get(ARTIFACT_DATA_PROPERTY_ID + "/HDFS/NAMENODE"); + Assert.assertNotNull(descriptor); + Assert.assertEquals(2, ((ArrayList) descriptor.get("Component")).size()); + MetricDefinition md = (MetricDefinition) ((ArrayList) descriptor.get("Component")).iterator().next(); + + Assert.assertEquals("rpcdetailed.rpcdetailed.client.BlockReceivedAndDeletedAvgTime", + md.getMetrics().get("metrics/rpcdetailed/client/blockReceived_avg_time").getName()); + + Assert.assertEquals("rpc.rpc.healthcheck.CallQueueLength", + md.getMetrics().get("metrics/rpc/healthcheck/callQueueLen").getName()); + + Assert.assertEquals("rpcdetailed.rpcdetailed.datanode.DeleteNumOps", + md.getMetrics().get("metrics/rpcdetailed/datanode/delete_num_ops").getName()); + + verify(managementController); + } + + @Test @SuppressWarnings("unchecked") public void testGetWidgetDescriptorForService() throws Exception { AmbariManagementController managementController = createNiceMock(AmbariManagementController.class); http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/PropertyHelperTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/PropertyHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/PropertyHelperTest.java index d450177..2beb462 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/PropertyHelperTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/utilities/PropertyHelperTest.java @@ -302,6 +302,20 @@ public class PropertyHelperTest { } } + @Test + public void testInsertTagIntoMetricName() { + Assert.assertEquals("rpc.rpc.client.CallQueueLength", + PropertyHelper.insertTagInToMetricName("client", "rpc.rpc.CallQueueLength", "rpc.rpc")); + + Assert.assertEquals("rpc.rpc.client.CallQueueLength", + PropertyHelper.insertTagInToMetricName("client", "rpc.rpc.CallQueueLength", "rpc.rpc.")); + + Assert.assertEquals("metrics/rpc/client/CallQueueLen", + PropertyHelper.insertTagInToMetricName("client", "metrics/rpc/CallQueueLen", "rpc.rpc")); + + Assert.assertEquals("metrics/rpc/client/CallQueueLen", + PropertyHelper.insertTagInToMetricName("client", "metrics/rpc/CallQueueLen", "rpc.rpc.")); + } // remove any replacement tokens (e.g. $1.replaceAll(\",q(\\d+)=\",\"/\").substring(1)) in the metric names private static Map<String, Map<String, PropertyInfo>> normalizeMetricNames(Map<String, Map<String, PropertyInfo>> gids) { http://git-wip-us.apache.org/repos/asf/ambari/blob/f3659cce/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog222Test.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog222Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog222Test.java index 1d18206..f6dcb18 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog222Test.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog222Test.java @@ -40,24 +40,34 @@ import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; import org.apache.ambari.server.orm.dao.AlertDefinitionDAO; import org.apache.ambari.server.orm.dao.StackDAO; +import org.apache.ambari.server.orm.dao.WidgetDAO; import org.apache.ambari.server.orm.entities.AlertDefinitionEntity; import org.apache.ambari.server.orm.entities.StackEntity; +import org.apache.ambari.server.orm.entities.WidgetEntity; +import org.apache.ambari.server.stack.StackManagerFactory; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Config; import org.apache.ambari.server.state.ServiceComponentHost; +import org.apache.ambari.server.state.ServiceInfo; import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.state.StackInfo; import org.apache.ambari.server.state.stack.OsFamily; +import org.apache.commons.io.FileUtils; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import javax.persistence.EntityManager; +import java.io.File; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -83,7 +93,8 @@ public class UpgradeCatalog222Test { private UpgradeCatalogHelper upgradeCatalogHelper; private StackEntity desiredStackEntity; - + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Before public void init() { @@ -114,16 +125,18 @@ public class UpgradeCatalog222Test { Method updateAMSConfigs = UpgradeCatalog222.class.getDeclaredMethod("updateAMSConfigs"); Method updateHiveConfigs = UpgradeCatalog222.class.getDeclaredMethod("updateHiveConfig"); Method updateHostRoleCommands = UpgradeCatalog222.class.getDeclaredMethod("updateHostRoleCommands"); + Method updateHDFSWidget = UpgradeCatalog222.class.getDeclaredMethod("updateHDFSWidgetDefinition"); UpgradeCatalog222 upgradeCatalog222 = createMockBuilder(UpgradeCatalog222.class) - .addMockedMethod(addNewConfigurationsFromXml) - .addMockedMethod(updateAlerts) - .addMockedMethod(updateStormConfigs) - .addMockedMethod(updateAMSConfigs) - .addMockedMethod(updateHiveConfigs) - .addMockedMethod(updateHostRoleCommands) - .createMock(); + .addMockedMethod(addNewConfigurationsFromXml) + .addMockedMethod(updateAlerts) + .addMockedMethod(updateStormConfigs) + .addMockedMethod(updateAMSConfigs) + .addMockedMethod(updateHiveConfigs) + .addMockedMethod(updateHostRoleCommands) + .addMockedMethod(updateHDFSWidget) + .createMock(); upgradeCatalog222.addNewConfigurationsFromXml(); expectLastCall().once(); @@ -137,6 +150,8 @@ public class UpgradeCatalog222Test { expectLastCall().once(); upgradeCatalog222.updateHiveConfig(); expectLastCall().once(); + upgradeCatalog222.updateHDFSWidgetDefinition(); + expectLastCall().once(); replay(upgradeCatalog222); @@ -176,8 +191,8 @@ public class UpgradeCatalog222Test { expect(mockAlertDefinitionDAO.findByName(eq(clusterId), eq("yarn_app_timeline_server_webui"))) .andReturn(mockATSWebAlert).atLeastOnce(); expect(mockATSWebAlert.getSource()).andReturn("{\"uri\": {\n" + - " \"http\": \"{{yarn-site/yarn.timeline-service.webapp.address}}/ws/v1/timeline\",\n" + - " \"https\": \"{{yarn-site/yarn.timeline-service.webapp.https.address}}/ws/v1/timeline\" } }"); + " \"http\": \"{{yarn-site/yarn.timeline-service.webapp.address}}/ws/v1/timeline\",\n" + + " \"https\": \"{{yarn-site/yarn.timeline-service.webapp.https.address}}/ws/v1/timeline\" } }"); mockATSWebAlert.setSource("{\"uri\":{\"http\":\"{{yarn-site/yarn.timeline-service.webapp.address}}/ws/v1/timeline\",\"https\":\"{{yarn-site/yarn.timeline-service.webapp.https.address}}/ws/v1/timeline\"}}"); expectLastCall().once(); @@ -255,7 +270,6 @@ public class UpgradeCatalog222Test { easyMockSupport.verifyAll(); } - @Test public void testAmsSiteUpdateConfigs() throws Exception{ @@ -330,7 +344,60 @@ public class UpgradeCatalog222Test { Map<String, String> updatedProperties = propertiesCapture.getValue(); assertTrue(Maps.difference(newPropertiesAmsSite, updatedProperties).areEqual()); + } + @Test + public void testHDFSWidgetUpdate() throws Exception { + final Clusters clusters = createNiceMock(Clusters.class); + final Cluster cluster = createNiceMock(Cluster.class); + final AmbariManagementController controller = createNiceMock(AmbariManagementController.class); + final Gson gson = new Gson(); + final WidgetDAO widgetDAO = createNiceMock(WidgetDAO.class); + final AmbariMetaInfo metaInfo = createNiceMock(AmbariMetaInfo.class); + WidgetEntity widgetEntity = createNiceMock(WidgetEntity.class); + StackId stackId = new StackId("HDP", "2.0.0"); + StackInfo stackInfo = createNiceMock(StackInfo.class); + ServiceInfo serviceInfo = createNiceMock(ServiceInfo.class); + + String widgetStr = "{\"layouts\":[{\"layout_name\":\"default_hdfs_dashboard\",\"display_name\":\"Standard HDFS Dashboard\",\"section_name\":\"HDFS_SUMMARY\",\"widgetLayoutInfo\":[{\"widget_name\":\"NameNode RPC\",\"metrics\":[],\"values\":[]}]}]}"; + + File dataDirectory = temporaryFolder.newFolder(); + File file = new File(dataDirectory, "hdfs_widget.json"); + FileUtils.writeStringToFile(file, widgetStr); + + final Injector mockInjector = Guice.createInjector(new AbstractModule() { + @Override + protected void configure() { + bind(EntityManager.class).toInstance(createNiceMock(EntityManager.class)); + bind(AmbariManagementController.class).toInstance(controller); + bind(Clusters.class).toInstance(clusters); + bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class)); + bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class)); + bind(Gson.class).toInstance(gson); + bind(WidgetDAO.class).toInstance(widgetDAO); + bind(StackManagerFactory.class).toInstance(createNiceMock(StackManagerFactory.class)); + bind(AmbariMetaInfo.class).toInstance(metaInfo); + } + }); + expect(controller.getClusters()).andReturn(clusters).anyTimes(); + expect(clusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{ + put("normal", cluster); + }}).anyTimes(); + expect(cluster.getClusterId()).andReturn(1L).anyTimes(); + expect(widgetDAO.findByName(1L, "NameNode RPC", "ambari", "HDFS_SUMMARY")) + .andReturn(Collections.singletonList(widgetEntity)); + expect(cluster.getDesiredStackVersion()).andReturn(stackId); + expect(metaInfo.getStack("HDP", "2.0.0")).andReturn(stackInfo); + expect(stackInfo.getService("HDFS")).andReturn(serviceInfo); + expect(serviceInfo.getWidgetsDescriptorFile()).andReturn(file); + expect(widgetDAO.merge(widgetEntity)).andReturn(null); + expect(widgetEntity.getWidgetName()).andReturn("Namenode RPC").anyTimes(); + + replay(clusters, cluster, controller, widgetDAO, metaInfo, widgetEntity, stackInfo, serviceInfo); + + mockInjector.getInstance(UpgradeCatalog222.class).updateHDFSWidgetDefinition(); + + verify(clusters, cluster, controller, widgetDAO, widgetEntity, stackInfo, serviceInfo); } @Test