This is an automated email from the ASF dual-hosted git repository.
jialiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ambari-metrics.git
The following commit(s) were added to refs/heads/master by this push:
new b1ac7d7 AMBARI-26492: Drop ambari-metrics-flume-sink (#165)
b1ac7d7 is described below
commit b1ac7d78d73818ae896bff279ef7d3f27ef76741
Author: Jhen-Yung Hsu <[email protected]>
AuthorDate: Tue May 13 08:47:25 2025 +0800
AMBARI-26492: Drop ambari-metrics-flume-sink (#165)
Co-authored-by: yungh <[email protected]>
---
ambari-metrics-assembly/pom.xml | 29 ---
ambari-metrics-assembly/src/main/assembly/sink.xml | 9 -
.../src/main/package/deb/control/postinst | 7 +-
.../src/main/package/rpm/sink/postinstall.sh | 7 +-
ambari-metrics-flume-sink/pom.xml | 147 -----------
.../src/main/assemblies/empty.xml | 21 --
.../src/main/assemblies/jar-with-common.xml | 35 ---
.../src/main/conf/flume-metrics2.properties.j2 | 31 ---
.../sink/flume/FlumeTimelineMetricsSink.java | 272 ---------------------
.../sink/flume/FlumeTimelineMetricsSinkTest.java | 172 -------------
pom.xml | 1 -
11 files changed, 4 insertions(+), 727 deletions(-)
diff --git a/ambari-metrics-assembly/pom.xml b/ambari-metrics-assembly/pom.xml
index 78bb467..f3b2412 100644
--- a/ambari-metrics-assembly/pom.xml
+++ b/ambari-metrics-assembly/pom.xml
@@ -39,7 +39,6 @@
<grafana.dir>${project.basedir}/../ambari-metrics-grafana</grafana.dir>
<hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir>
<storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir>
-
<flume-sink.dir>${project.basedir}/../ambari-metrics-flume-sink</flume-sink.dir>
<kafka-sink.dir>${project.basedir}/../ambari-metrics-kafka-sink</kafka-sink.dir>
<python.ver>python3 >= 3.0</python.ver>
<python.devel>python3-devel</python.devel>
@@ -53,7 +52,6 @@
<rpm.arch>x86_64</rpm.arch>
<hadoop.sink.jar>ambari-metrics-hadoop-sink-with-common-${project.version}.jar</hadoop.sink.jar>
<storm.sink.jar>ambari-metrics-storm-sink-with-common-${project.version}.jar</storm.sink.jar>
-
<flume.sink.jar>ambari-metrics-flume-sink-with-common-${project.version}.jar</flume.sink.jar>
<kafka.sink.jar>ambari-metrics-kafka-sink-with-common-${project.version}.jar</kafka.sink.jar>
</properties>
@@ -467,14 +465,6 @@
</source>
</sources>
</mapping>
- <mapping>
- <directory>/usr/lib/flume/lib</directory>
- <sources>
- <source>
-
<location>${flume-sink.dir}/target/ambari-metrics-flume-sink-with-common-${project.version}.jar</location>
- </source>
- </sources>
- </mapping>
<mapping>
<directory>/usr/lib/storm/lib</directory>
<sources>
@@ -815,7 +805,6 @@
<path>/usr/lib/ambari-metrics-hadoop-sink</path>
<path>/usr/lib/ambari-metrics-kafka-sink</path>
<path>/usr/lib/ambari-metrics-kafka-sink/lib</path>
- <path>/usr/lib/flume/lib</path>
<path>/usr/lib/storm/lib</path>
</paths>
</data>
@@ -1116,19 +1105,6 @@
</mapper>
</data>
- <!-- flume sink -->
-
- <data>
- <src>${flume-sink.dir}/target/${flume.sink.jar}</src>
- <type>file</type>
- <mapper>
- <type>perm</type>
- <filemode>644</filemode>
- <dirmode>755</dirmode>
- <prefix>/usr/lib/flume/lib</prefix>
- </mapper>
- </data>
-
<!-- storm sinks -->
<data>
@@ -1259,11 +1235,6 @@
<artifactId>ambari-metrics-hadoop-sink</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.ambari</groupId>
- <artifactId>ambari-metrics-flume-sink</artifactId>
- <version>${project.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.ambari</groupId>
<artifactId>ambari-metrics-storm-sink</artifactId>
diff --git a/ambari-metrics-assembly/src/main/assembly/sink.xml
b/ambari-metrics-assembly/src/main/assembly/sink.xml
index 1400c7b..f7fdf0f 100644
--- a/ambari-metrics-assembly/src/main/assembly/sink.xml
+++ b/ambari-metrics-assembly/src/main/assembly/sink.xml
@@ -30,10 +30,6 @@
<directory>${hadoop-sink.dir}/src/main/conf</directory>
<outputDirectory>hadoop-sink/conf</outputDirectory>
</fileSet>
- <fileSet>
- <directory>${flume-sink.dir}/src/main/conf</directory>
- <outputDirectory>hadoop-sink/conf</outputDirectory>
- </fileSet>
<fileSet>
<directory>${storm-sink.dir}/src/main/conf</directory>
<outputDirectory>hadoop-sink/conf</outputDirectory>
@@ -50,11 +46,6 @@
<source>${hadoop-sink.dir}/target/ambari-metrics-hadoop-sink-with-common-${project.version}.jar</source>
<outputDirectory>hadoop-sink</outputDirectory>
</file>
- <file>
- <fileMode>644</fileMode>
-
<source>${flume-sink.dir}/target/ambari-metrics-flume-sink-with-common-${project.version}.jar</source>
- <outputDirectory>hadoop-sink</outputDirectory>
- </file>
<file>
<fileMode>644</fileMode>
<source>${storm-sink.dir}/target/ambari-metrics-storm-sink-with-common-${project.version}.jar</source>
diff --git a/ambari-metrics-assembly/src/main/package/deb/control/postinst
b/ambari-metrics-assembly/src/main/package/deb/control/postinst
index e75d557..3d68b05 100644
--- a/ambari-metrics-assembly/src/main/package/deb/control/postinst
+++ b/ambari-metrics-assembly/src/main/package/deb/control/postinst
@@ -17,9 +17,6 @@
HADOOP_LINK_NAME="/usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar"
HADOOP_SINK_JAR="/usr/lib/ambari-metrics-hadoop-sink/${hadoop.sink.jar}"
-FLUME_LINK_NAME="/usr/lib/flume/lib/ambari-metrics-flume-sink.jar"
-FLUME_SINK_JAR="/usr/lib/flume/lib/${flume.sink.jar}"
-
KAFKA_LINK_NAME="/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar"
KAFKA_SINK_JAR="/usr/lib/ambari-metrics-kafka-sink/${kafka.sink.jar}"
@@ -27,8 +24,8 @@
KAFKA_SINK_JAR="/usr/lib/ambari-metrics-kafka-sink/${kafka.sink.jar}"
#STORM_SINK_JAR="/usr/lib/storm/lib/${storm.sink.jar}"
#STORM_LINK_NAME="/usr/lib/storm/lib/ambari-metrics-storm-sink.jar"
-JARS=(${HADOOP_SINK_JAR} ${FLUME_SINK_JAR} ${KAFKA_SINK_JAR})
-LINKS=(${HADOOP_LINK_NAME} ${FLUME_LINK_NAME} ${KAFKA_LINK_NAME})
+JARS=(${HADOOP_SINK_JAR} ${KAFKA_SINK_JAR})
+LINKS=(${HADOOP_LINK_NAME} ${KAFKA_LINK_NAME})
for index in ${!LINKS[*]}
do
diff --git a/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh
b/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh
index e75d557..3d68b05 100644
--- a/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh
+++ b/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh
@@ -17,9 +17,6 @@
HADOOP_LINK_NAME="/usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar"
HADOOP_SINK_JAR="/usr/lib/ambari-metrics-hadoop-sink/${hadoop.sink.jar}"
-FLUME_LINK_NAME="/usr/lib/flume/lib/ambari-metrics-flume-sink.jar"
-FLUME_SINK_JAR="/usr/lib/flume/lib/${flume.sink.jar}"
-
KAFKA_LINK_NAME="/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar"
KAFKA_SINK_JAR="/usr/lib/ambari-metrics-kafka-sink/${kafka.sink.jar}"
@@ -27,8 +24,8 @@
KAFKA_SINK_JAR="/usr/lib/ambari-metrics-kafka-sink/${kafka.sink.jar}"
#STORM_SINK_JAR="/usr/lib/storm/lib/${storm.sink.jar}"
#STORM_LINK_NAME="/usr/lib/storm/lib/ambari-metrics-storm-sink.jar"
-JARS=(${HADOOP_SINK_JAR} ${FLUME_SINK_JAR} ${KAFKA_SINK_JAR})
-LINKS=(${HADOOP_LINK_NAME} ${FLUME_LINK_NAME} ${KAFKA_LINK_NAME})
+JARS=(${HADOOP_SINK_JAR} ${KAFKA_SINK_JAR})
+LINKS=(${HADOOP_LINK_NAME} ${KAFKA_LINK_NAME})
for index in ${!LINKS[*]}
do
diff --git a/ambari-metrics-flume-sink/pom.xml
b/ambari-metrics-flume-sink/pom.xml
deleted file mode 100644
index 6ac7f68..0000000
--- a/ambari-metrics-flume-sink/pom.xml
+++ /dev/null
@@ -1,147 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
- http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>ambari-metrics</artifactId>
- <groupId>org.apache.ambari</groupId>
- <version>3.1.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
- <artifactId>ambari-metrics-flume-sink</artifactId>
- <version>3.1.0-SNAPSHOT</version>
- <name>Ambari Metrics Flume Sink</name>
- <packaging>jar</packaging>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <executions>
- <execution>
- <configuration>
- <descriptors>
-
<descriptor>src/main/assemblies/jar-with-common.xml</descriptor>
- </descriptors>
- <attach>false</attach>
- <tarLongFileMode>gnu</tarLongFileMode>
- <appendAssemblyId>false</appendAssemblyId>
-
<finalName>${project.artifactId}-with-common-${project.version}</finalName>
- </configuration>
- <id>build-jar</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.2</version>
- </plugin>
- <plugin>
- <groupId>com.github.goldin</groupId>
- <artifactId>copy-maven-plugin</artifactId>
- <version>0.2.5</version>
- <executions>
- <execution>
- <id>create-archive</id>
- <phase>none</phase>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.vafer</groupId>
- <artifactId>jdeb</artifactId>
- <executions>
- <execution>
- <!--Stub execution on direct plugin call - workaround for ambari
deb build process-->
- <id>stub-execution</id>
- <phase>none</phase>
- <goals>
- <goal>jdeb</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <skip>true</skip>
- <attach>false</attach>
- <submodules>false</submodules>
-
<controlDir>${project.basedir}/../src/main/package/deb/control</controlDir>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- <version>1.5.1</version>
- <scope>compile</scope>
- <exclusions>
- <exclusion>
- <artifactId>libthrift</artifactId>
- <groupId>org.apache.thrift</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jetty-util</artifactId>
- <groupId>org.mortbay.jetty</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.ambari</groupId>
- <artifactId>ambari-metrics-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- <version>1.9.13</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- <version>4.10</version>
- </dependency>
- <dependency>
- <groupId>org.easymock</groupId>
- <artifactId>easymock</artifactId>
- <version>3.2</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-api-easymock</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-module-junit4</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>18.0</version>
- </dependency>
- </dependencies>
-</project>
diff --git a/ambari-metrics-flume-sink/src/main/assemblies/empty.xml
b/ambari-metrics-flume-sink/src/main/assemblies/empty.xml
deleted file mode 100644
index 35738b1..0000000
--- a/ambari-metrics-flume-sink/src/main/assemblies/empty.xml
+++ /dev/null
@@ -1,21 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<assembly>
- <id>empty</id>
- <formats/>
-</assembly>
diff --git a/ambari-metrics-flume-sink/src/main/assemblies/jar-with-common.xml
b/ambari-metrics-flume-sink/src/main/assemblies/jar-with-common.xml
deleted file mode 100644
index bfd8b29..0000000
--- a/ambari-metrics-flume-sink/src/main/assemblies/jar-with-common.xml
+++ /dev/null
@@ -1,35 +0,0 @@
-<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<assembly>
- <id>jar-with-common</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <dependencySets>
- <dependencySet>
- <fileMode>644</fileMode>
- <outputDirectory>/</outputDirectory>
- <unpack>true</unpack>
- <includes>
- <include>org.apache.ambari:ambari-metrics-common</include>
- <include>org.apache.ambari:ambari-metrics-flume-sink</include>
- </includes>
- </dependencySet>
- </dependencySets>
-</assembly>
diff --git
a/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
b/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
deleted file mode 100644
index 58c5f09..0000000
--- a/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
+++ /dev/null
@@ -1,31 +0,0 @@
-{#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#}
-
-collector=http://localhost:6188
-collectionFrequency=60000
-maxRowCacheSize=10000
-sendInterval={{metrics_report_interval}}000
-clusterReporterAppId=nimbus
-host_in_memory_aggregation = {{host_in_memory_aggregation}}
-host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
-{% if is_aggregation_https_enabled %}
-host_in_memory_aggregation_protocol = {{host_in_memory_aggregation_protocol}}
-{% endif %}
-
-# Metric names having type COUNTER
-counters=EventTakeSuccessCount,EventPutSuccessCount,EventTakeAttemptCount,EventPutAttemptCount
diff --git
a/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
b/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
deleted file mode 100644
index 720c371..0000000
---
a/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.flume;
-
-import org.apache.commons.lang.math.NumberUtils;
-import org.apache.flume.Context;
-import org.apache.flume.FlumeException;
-import org.apache.flume.instrumentation.MonitorService;
-import org.apache.flume.instrumentation.util.JMXPollUtil;
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
-import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink
implements MonitorService {
- private String collectorUri;
- private String protocol;
- // Key - component(instance_id)
- private Map<String, TimelineMetricsCache> metricsCaches;
- private int maxRowCacheSize;
- private int metricsSendInterval;
- private ScheduledExecutorService scheduledExecutorService;
- private long pollFrequency;
- private String hostname;
- private String port;
- private Collection<String> collectorHosts;
- private String zookeeperQuorum;
- private final static String COUNTER_METRICS_PROPERTY = "counters";
- private final Set<String> counterMetrics = new HashSet<String>();
- private int timeoutSeconds = 10;
- private boolean setInstanceId;
- private String instanceId;
- private boolean hostInMemoryAggregationEnabled;
- private int hostInMemoryAggregationPort;
- private String hostInMemoryAggregationProtocol;
-
-
- @Override
- public void start() {
- LOG.info("Starting Flume Metrics Sink");
- TimelineMetricsCollector timelineMetricsCollector = new
TimelineMetricsCollector();
- if (scheduledExecutorService == null ||
scheduledExecutorService.isShutdown() ||
scheduledExecutorService.isTerminated()) {
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
- }
- scheduledExecutorService.scheduleWithFixedDelay(timelineMetricsCollector,
0,
- pollFrequency, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void stop() {
- LOG.info("Stopping Flume Metrics Sink");
- scheduledExecutorService.shutdown();
- }
-
- @Override
- public void configure(Context context) {
- LOG.info("Context parameters " + context);
- try {
- hostname = InetAddress.getLocalHost().getHostName();
- //If not FQDN , call DNS
- if ((hostname == null) || (!hostname.contains("."))) {
- hostname = InetAddress.getLocalHost().getCanonicalHostName();
- }
- hostname = hostname.toLowerCase();
-
- } catch (UnknownHostException e) {
- LOG.error("Could not identify hostname.");
- throw new FlumeException("Could not identify hostname.", e);
- }
- Configuration configuration = new
Configuration("/flume-metrics2.properties");
- timeoutSeconds =
Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS,
- String.valueOf(DEFAULT_POST_TIMEOUT_SECONDS)));
- maxRowCacheSize =
Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
- String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
- metricsSendInterval =
Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
- String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
- metricsCaches = new HashMap<String, TimelineMetricsCache>();
- collectorHosts =
parseHostsStringIntoCollection(configuration.getProperty(COLLECTOR_HOSTS_PROPERTY));
- zookeeperQuorum = configuration.getProperty("zookeeper.quorum");
- protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
- port = configuration.getProperty(COLLECTOR_PORT, "6188");
- setInstanceId =
Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
- instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, "");
-
- hostInMemoryAggregationEnabled =
Boolean.getBoolean(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY,
"false"));
- hostInMemoryAggregationPort =
Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY,
"61888"));
- hostInMemoryAggregationProtocol =
configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
- // Initialize the collector write strategy
- super.init();
-
- if (protocol.contains("https") ||
hostInMemoryAggregationProtocol.contains("https")) {
- String trustStorePath =
configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
- String trustStoreType =
configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
- String trustStorePwd =
configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
- loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
- }
- collectorUri = constructTimelineMetricUri(protocol,
findPreferredCollectHost(), port);
-
- pollFrequency =
Long.parseLong(configuration.getProperty("collectionFrequency"));
-
- String[] metrics =
configuration.getProperty(COUNTER_METRICS_PROPERTY).trim().split(",");
- Collections.addAll(counterMetrics, metrics);
- }
-
- @Override
- public String getCollectorUri(String host) {
- return constructTimelineMetricUri(protocol, host, port);
- }
-
- @Override
- protected String getCollectorProtocol() {
- return protocol;
- }
-
- @Override
- protected String getCollectorPort() {
- return port;
- }
-
- @Override
- protected int getTimeoutSeconds() {
- return timeoutSeconds;
- }
-
- @Override
- protected String getZookeeperQuorum() {
- return zookeeperQuorum;
- }
-
- @Override
- protected Collection<String> getConfiguredCollectorHosts() {
- return collectorHosts;
- }
-
- @Override
- protected String getHostname() {
- return hostname;
- }
-
- @Override
- protected boolean isHostInMemoryAggregationEnabled() {
- return hostInMemoryAggregationEnabled;
- }
-
- @Override
- protected int getHostInMemoryAggregationPort() {
- return hostInMemoryAggregationPort;
- }
-
- @Override
- protected String getHostInMemoryAggregationProtocol() {
- return hostInMemoryAggregationProtocol;
- }
-
- public void setPollFrequency(long pollFrequency) {
- this.pollFrequency = pollFrequency;
- }
-
- //Test hepler method
- protected void setMetricsCaches(Map<String, TimelineMetricsCache>
metricsCaches) {
- this.metricsCaches = metricsCaches;
- }
-
- /**
- * Worker which polls JMX for all mbeans with
- * {@link javax.management.ObjectName} within the flume namespace:
- * org.apache.flume. All attributes of such beans are sent
- * to the metrics collector service.
- */
- class TimelineMetricsCollector implements Runnable {
- @Override
- public void run() {
- LOG.debug("Collecting Metrics for Flume");
- try {
- Map<String, Map<String, String>> metricsMap =
JMXPollUtil.getAllMBeans();
- long currentTimeMillis = System.currentTimeMillis();
- for (String component : metricsMap.keySet()) {
- Map<String, String> attributeMap = metricsMap.get(component);
- LOG.debug("Attributes for component " + component);
- processComponentAttributes(currentTimeMillis, component,
attributeMap);
- }
- } catch (UnableToConnectException uce) {
- LOG.warn("Unable to send metrics to collector by address:" +
uce.getConnectUrl());
- } catch (Exception e) {
- LOG.error("Unexpected error", e);
- }
- LOG.debug("Finished collecting Metrics for Flume");
- }
-
- private void processComponentAttributes(long currentTimeMillis, String
component, Map<String, String> attributeMap) throws IOException {
- List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
- if (!metricsCaches.containsKey(component)) {
- metricsCaches.put(component, new TimelineMetricsCache(maxRowCacheSize,
metricsSendInterval));
- }
- TimelineMetricsCache metricsCache = metricsCaches.get(component);
- for (String attributeName : attributeMap.keySet()) {
- String attributeValue = attributeMap.get(attributeName);
- if (NumberUtils.isNumber(attributeValue)) {
- LOG.info(attributeName + " = " + attributeValue);
- TimelineMetric timelineMetric =
createTimelineMetric(currentTimeMillis,
- component, attributeName, attributeValue);
- // Put intermediate values into the cache until it is time to send
- metricsCache.putTimelineMetric(timelineMetric,
isCounterMetric(attributeName));
-
- TimelineMetric cachedMetric =
metricsCache.getTimelineMetric(attributeName);
-
- if (cachedMetric != null) {
- metricList.add(cachedMetric);
- }
- }
- }
-
- if (!metricList.isEmpty()) {
- TimelineMetrics timelineMetrics = new TimelineMetrics();
- timelineMetrics.setMetrics(metricList);
- emitMetrics(timelineMetrics);
- }
- }
-
- private TimelineMetric createTimelineMetric(long currentTimeMillis, String
component, String attributeName, String attributeValue) {
- TimelineMetric timelineMetric = new TimelineMetric();
- timelineMetric.setMetricName(attributeName);
- timelineMetric.setHostName(hostname);
- if (setInstanceId) {
- timelineMetric.setInstanceId(instanceId + component);
- } else {
- timelineMetric.setInstanceId(component);
- }
- timelineMetric.setAppId("FLUME_HANDLER");
- timelineMetric.setStartTime(currentTimeMillis);
- timelineMetric.getMetricValues().put(currentTimeMillis,
Double.parseDouble(attributeValue));
- return timelineMetric;
- }
- }
-
- private boolean isCounterMetric(String attributeName) {
- return counterMetrics.contains(attributeName);
- }
-}
diff --git
a/ambari-metrics-flume-sink/src/test/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java
b/ambari-metrics-flume-sink/src/test/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java
deleted file mode 100644
index 99da43f..0000000
---
a/ambari-metrics-flume-sink/src/test/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.flume;
-
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replay;
-import static org.powermock.api.easymock.PowerMock.resetAll;
-import static org.powermock.api.easymock.PowerMock.verifyAll;
-
-import java.net.InetAddress;
-import java.util.Collections;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flume.Context;
-import org.apache.flume.instrumentation.util.JMXPollUtil;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
-import org.easymock.EasyMock;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({JMXPollUtil.class, Executors.class,
FlumeTimelineMetricsSink.class})
-public class FlumeTimelineMetricsSinkTest {
- @Test
- public void testNonNumericMetricMetricExclusion() throws
InterruptedException {
- FlumeTimelineMetricsSink flumeTimelineMetricsSink = new
FlumeTimelineMetricsSink();
- FlumeTimelineMetricsSink.TimelineMetricsCollector collector =
- flumeTimelineMetricsSink.new TimelineMetricsCollector();
- mockStatic(JMXPollUtil.class);
- EasyMock.expect(JMXPollUtil.getAllMBeans()).andReturn(
- Collections.singletonMap("component1",
Collections.singletonMap("key1", "value1"))).once();
- replay(JMXPollUtil.class);
- collector.run();
- verifyAll();
- }
-
- @Test
- public void testNumericMetricSubmission() throws InterruptedException {
- FlumeTimelineMetricsSink flumeTimelineMetricsSink = new
FlumeTimelineMetricsSink();
- FlumeTimelineMetricsSink.TimelineMetricsCollector collector =
- flumeTimelineMetricsSink.new TimelineMetricsCollector();
- mockStatic(JMXPollUtil.class);
- EasyMock.expect(JMXPollUtil.getAllMBeans()).andReturn(
- Collections.singletonMap("component1",
Collections.singletonMap("key1", "42"))).once();
- replay(JMXPollUtil.class);
- collector.run();
- verifyAll();
- }
-
- private TimelineMetricsCache
getTimelineMetricsCache(FlumeTimelineMetricsSink flumeTimelineMetricsSink) {
- TimelineMetricsCache timelineMetricsCache =
EasyMock.createNiceMock(TimelineMetricsCache.class);
-
flumeTimelineMetricsSink.setMetricsCaches(Collections.singletonMap("SINK",timelineMetricsCache));
- EasyMock.expect(timelineMetricsCache.getTimelineMetric("key1"))
- .andReturn(new TimelineMetric()).once();
- timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
- EasyMock.expectLastCall().once();
- return timelineMetricsCache;
- }
-
- @Test
- public void testMonitorRestart() throws InterruptedException {
- FlumeTimelineMetricsSink flumeTimelineMetricsSink = new
FlumeTimelineMetricsSink();
- TimelineMetricsCache timelineMetricsCache =
getTimelineMetricsCache(flumeTimelineMetricsSink);
- flumeTimelineMetricsSink.setPollFrequency(1);
- mockStatic(Executors.class);
- ScheduledExecutorService executor =
createNiceMock(ScheduledExecutorService.class);
- expect(Executors.newSingleThreadScheduledExecutor()).andReturn(executor);
- FlumeTimelineMetricsSink.TimelineMetricsCollector collector = anyObject();
- TimeUnit unit = anyObject();
- expect(executor.scheduleWithFixedDelay(collector, eq(0), eq(1),
unit)).andReturn(null);
- executor.shutdown();
- replay(timelineMetricsCache, Executors.class, executor);
-
- flumeTimelineMetricsSink.start();
- flumeTimelineMetricsSink.stop();
-
- verifyAll();
- }
-
- @Test
- public void testMetricsRetrievalExceptionTolerance() throws
InterruptedException {
- FlumeTimelineMetricsSink flumeTimelineMetricsSink = new
FlumeTimelineMetricsSink();
- FlumeTimelineMetricsSink.TimelineMetricsCollector collector =
- flumeTimelineMetricsSink.new TimelineMetricsCollector();
- mockStatic(JMXPollUtil.class);
- EasyMock.expect(JMXPollUtil.getAllMBeans()).
- andThrow(new RuntimeException("Failed to retrieve Flume
Properties")).once();
- replay(JMXPollUtil.class);
- collector.run();
- verifyAll();
- }
-
- @Test
- @PrepareForTest({Configuration.class, FlumeTimelineMetricsSink.class})
- public void testGettingFqdn() throws Exception {
- FlumeTimelineMetricsSink flumeTimelineMetricsSink = new
FlumeTimelineMetricsSink();
- Configuration config = createNiceMock(Configuration.class);
-
- expect(config.getProperty(anyString(), anyString()))
- .andReturn("60")
- .anyTimes();
- expect(config.getProperty(anyString()))
- .andReturn("60")
- .anyTimes();
- replay(config);
-
- PowerMock.expectNew(Configuration.class, anyString())
- .andReturn(config);
- replay(Configuration.class);
-
- // getHostName() returned FQDN
- InetAddress address = createNiceMock(InetAddress.class);
- expect(address.getHostName()).andReturn("hostname.domain").once();
- replay(address);
-
- mockStatic(InetAddress.class);
- expect(InetAddress.getLocalHost()).andReturn(address).once();
- replay(InetAddress.class);
-
- flumeTimelineMetricsSink.configure(new Context());
- verifyAll();
-
- resetAll();
-
- PowerMock.expectNew(Configuration.class, anyString())
- .andReturn(config);
- replay(Configuration.class);
-
- // getHostName() returned short hostname, getCanonicalHostName() called
- address = createNiceMock(InetAddress.class);
- expect(address.getHostName()).andReturn("hostname").once();
- expect(address.getCanonicalHostName()).andReturn("hostname.domain").once();
- replay(address);
-
- mockStatic(InetAddress.class);
- expect(InetAddress.getLocalHost()).andReturn(address).times(2);
- replay(InetAddress.class);
-
- flumeTimelineMetricsSink.configure(new Context());
- verifyAll();
-
- }
-
-}
diff --git a/pom.xml b/pom.xml
index eed8204..35ca644 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,7 +24,6 @@
<modules>
<module>ambari-metrics-common</module>
<module>ambari-metrics-hadoop-sink</module>
- <module>ambari-metrics-flume-sink</module>
<module>ambari-metrics-kafka-sink</module>
<module>ambari-metrics-storm-sink</module>
<module>ambari-metrics-timelineservice</module>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]