This is an automated email from the ASF dual-hosted git repository.

jialiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ambari-metrics.git


The following commit(s) were added to refs/heads/master by this push:
     new b1ac7d7  AMBARI-26492: Drop ambari-metrics-flume-sink (#165)
b1ac7d7 is described below

commit b1ac7d78d73818ae896bff279ef7d3f27ef76741
Author: Jhen-Yung Hsu <[email protected]>
AuthorDate: Tue May 13 08:47:25 2025 +0800

    AMBARI-26492: Drop ambari-metrics-flume-sink (#165)
    
    Co-authored-by: yungh <[email protected]>
---
 ambari-metrics-assembly/pom.xml                    |  29 ---
 ambari-metrics-assembly/src/main/assembly/sink.xml |   9 -
 .../src/main/package/deb/control/postinst          |   7 +-
 .../src/main/package/rpm/sink/postinstall.sh       |   7 +-
 ambari-metrics-flume-sink/pom.xml                  | 147 -----------
 .../src/main/assemblies/empty.xml                  |  21 --
 .../src/main/assemblies/jar-with-common.xml        |  35 ---
 .../src/main/conf/flume-metrics2.properties.j2     |  31 ---
 .../sink/flume/FlumeTimelineMetricsSink.java       | 272 ---------------------
 .../sink/flume/FlumeTimelineMetricsSinkTest.java   | 172 -------------
 pom.xml                                            |   1 -
 11 files changed, 4 insertions(+), 727 deletions(-)

diff --git a/ambari-metrics-assembly/pom.xml b/ambari-metrics-assembly/pom.xml
index 78bb467..f3b2412 100644
--- a/ambari-metrics-assembly/pom.xml
+++ b/ambari-metrics-assembly/pom.xml
@@ -39,7 +39,6 @@
     <grafana.dir>${project.basedir}/../ambari-metrics-grafana</grafana.dir>
     
<hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir>
     
<storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir>
-    
<flume-sink.dir>${project.basedir}/../ambari-metrics-flume-sink</flume-sink.dir>
     
<kafka-sink.dir>${project.basedir}/../ambari-metrics-kafka-sink</kafka-sink.dir>
     <python.ver>python3 &gt;= 3.0</python.ver>
     <python.devel>python3-devel</python.devel>
@@ -53,7 +52,6 @@
     <rpm.arch>x86_64</rpm.arch>
     
<hadoop.sink.jar>ambari-metrics-hadoop-sink-with-common-${project.version}.jar</hadoop.sink.jar>
     
<storm.sink.jar>ambari-metrics-storm-sink-with-common-${project.version}.jar</storm.sink.jar>
-    
<flume.sink.jar>ambari-metrics-flume-sink-with-common-${project.version}.jar</flume.sink.jar>
     
<kafka.sink.jar>ambari-metrics-kafka-sink-with-common-${project.version}.jar</kafka.sink.jar>
   </properties>
 
@@ -467,14 +465,6 @@
                         </source>
                       </sources>
                     </mapping>
-                    <mapping>
-                      <directory>/usr/lib/flume/lib</directory>
-                      <sources>
-                        <source>
-                          
<location>${flume-sink.dir}/target/ambari-metrics-flume-sink-with-common-${project.version}.jar</location>
-                        </source>
-                      </sources>
-                    </mapping>
                     <mapping>
                       <directory>/usr/lib/storm/lib</directory>
                       <sources>
@@ -815,7 +805,6 @@
                     <path>/usr/lib/ambari-metrics-hadoop-sink</path>
                     <path>/usr/lib/ambari-metrics-kafka-sink</path>
                     <path>/usr/lib/ambari-metrics-kafka-sink/lib</path>
-                    <path>/usr/lib/flume/lib</path>
                     <path>/usr/lib/storm/lib</path>
                   </paths>
                 </data>
@@ -1116,19 +1105,6 @@
                   </mapper>
                 </data>
 
-                <!-- flume sink -->
-
-                <data>
-                  <src>${flume-sink.dir}/target/${flume.sink.jar}</src>
-                  <type>file</type>
-                  <mapper>
-                    <type>perm</type>
-                    <filemode>644</filemode>
-                    <dirmode>755</dirmode>
-                    <prefix>/usr/lib/flume/lib</prefix>
-                  </mapper>
-                </data>
-
                 <!-- storm sinks -->
 
                 <data>
@@ -1259,11 +1235,6 @@
       <artifactId>ambari-metrics-hadoop-sink</artifactId>
       <version>${project.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.apache.ambari</groupId>
-      <artifactId>ambari-metrics-flume-sink</artifactId>
-      <version>${project.version}</version>
-    </dependency>
     <dependency>
       <groupId>org.apache.ambari</groupId>
       <artifactId>ambari-metrics-storm-sink</artifactId>
diff --git a/ambari-metrics-assembly/src/main/assembly/sink.xml 
b/ambari-metrics-assembly/src/main/assembly/sink.xml
index 1400c7b..f7fdf0f 100644
--- a/ambari-metrics-assembly/src/main/assembly/sink.xml
+++ b/ambari-metrics-assembly/src/main/assembly/sink.xml
@@ -30,10 +30,6 @@
       <directory>${hadoop-sink.dir}/src/main/conf</directory>
       <outputDirectory>hadoop-sink/conf</outputDirectory>
     </fileSet>
-    <fileSet>
-      <directory>${flume-sink.dir}/src/main/conf</directory>
-      <outputDirectory>hadoop-sink/conf</outputDirectory>
-    </fileSet>
     <fileSet>
       <directory>${storm-sink.dir}/src/main/conf</directory>
       <outputDirectory>hadoop-sink/conf</outputDirectory>
@@ -50,11 +46,6 @@
       
<source>${hadoop-sink.dir}/target/ambari-metrics-hadoop-sink-with-common-${project.version}.jar</source>
       <outputDirectory>hadoop-sink</outputDirectory>
     </file>
-    <file>
-      <fileMode>644</fileMode>
-      
<source>${flume-sink.dir}/target/ambari-metrics-flume-sink-with-common-${project.version}.jar</source>
-      <outputDirectory>hadoop-sink</outputDirectory>
-    </file>
     <file>
       <fileMode>644</fileMode>
       
<source>${storm-sink.dir}/target/ambari-metrics-storm-sink-with-common-${project.version}.jar</source>
diff --git a/ambari-metrics-assembly/src/main/package/deb/control/postinst 
b/ambari-metrics-assembly/src/main/package/deb/control/postinst
index e75d557..3d68b05 100644
--- a/ambari-metrics-assembly/src/main/package/deb/control/postinst
+++ b/ambari-metrics-assembly/src/main/package/deb/control/postinst
@@ -17,9 +17,6 @@
 
HADOOP_LINK_NAME="/usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar"
 HADOOP_SINK_JAR="/usr/lib/ambari-metrics-hadoop-sink/${hadoop.sink.jar}"
 
-FLUME_LINK_NAME="/usr/lib/flume/lib/ambari-metrics-flume-sink.jar"
-FLUME_SINK_JAR="/usr/lib/flume/lib/${flume.sink.jar}"
-
 
KAFKA_LINK_NAME="/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar"
 KAFKA_SINK_JAR="/usr/lib/ambari-metrics-kafka-sink/${kafka.sink.jar}"
 
@@ -27,8 +24,8 @@ 
KAFKA_SINK_JAR="/usr/lib/ambari-metrics-kafka-sink/${kafka.sink.jar}"
 #STORM_SINK_JAR="/usr/lib/storm/lib/${storm.sink.jar}"
 #STORM_LINK_NAME="/usr/lib/storm/lib/ambari-metrics-storm-sink.jar"
 
-JARS=(${HADOOP_SINK_JAR} ${FLUME_SINK_JAR} ${KAFKA_SINK_JAR})
-LINKS=(${HADOOP_LINK_NAME} ${FLUME_LINK_NAME} ${KAFKA_LINK_NAME})
+JARS=(${HADOOP_SINK_JAR} ${KAFKA_SINK_JAR})
+LINKS=(${HADOOP_LINK_NAME} ${KAFKA_LINK_NAME})
 
 for index in ${!LINKS[*]}
 do
diff --git a/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh 
b/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh
index e75d557..3d68b05 100644
--- a/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh
+++ b/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh
@@ -17,9 +17,6 @@
 
HADOOP_LINK_NAME="/usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar"
 HADOOP_SINK_JAR="/usr/lib/ambari-metrics-hadoop-sink/${hadoop.sink.jar}"
 
-FLUME_LINK_NAME="/usr/lib/flume/lib/ambari-metrics-flume-sink.jar"
-FLUME_SINK_JAR="/usr/lib/flume/lib/${flume.sink.jar}"
-
 
KAFKA_LINK_NAME="/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar"
 KAFKA_SINK_JAR="/usr/lib/ambari-metrics-kafka-sink/${kafka.sink.jar}"
 
@@ -27,8 +24,8 @@ 
KAFKA_SINK_JAR="/usr/lib/ambari-metrics-kafka-sink/${kafka.sink.jar}"
 #STORM_SINK_JAR="/usr/lib/storm/lib/${storm.sink.jar}"
 #STORM_LINK_NAME="/usr/lib/storm/lib/ambari-metrics-storm-sink.jar"
 
-JARS=(${HADOOP_SINK_JAR} ${FLUME_SINK_JAR} ${KAFKA_SINK_JAR})
-LINKS=(${HADOOP_LINK_NAME} ${FLUME_LINK_NAME} ${KAFKA_LINK_NAME})
+JARS=(${HADOOP_SINK_JAR} ${KAFKA_SINK_JAR})
+LINKS=(${HADOOP_LINK_NAME} ${KAFKA_LINK_NAME})
 
 for index in ${!LINKS[*]}
 do
diff --git a/ambari-metrics-flume-sink/pom.xml 
b/ambari-metrics-flume-sink/pom.xml
deleted file mode 100644
index 6ac7f68..0000000
--- a/ambari-metrics-flume-sink/pom.xml
+++ /dev/null
@@ -1,147 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
-                             http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <parent>
-    <artifactId>ambari-metrics</artifactId>
-    <groupId>org.apache.ambari</groupId>
-    <version>3.1.0-SNAPSHOT</version>
-  </parent>
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>ambari-metrics-flume-sink</artifactId>
-  <version>3.1.0-SNAPSHOT</version>
-  <name>Ambari Metrics Flume Sink</name>
-  <packaging>jar</packaging>
-  <build>
-    <plugins>
-      <plugin>
-        <artifactId>maven-assembly-plugin</artifactId>
-        <executions>
-          <execution>
-            <configuration>
-              <descriptors>
-                
<descriptor>src/main/assemblies/jar-with-common.xml</descriptor>
-              </descriptors>
-              <attach>false</attach>
-              <tarLongFileMode>gnu</tarLongFileMode>
-              <appendAssemblyId>false</appendAssemblyId>
-              
<finalName>${project.artifactId}-with-common-${project.version}</finalName>
-            </configuration>
-            <id>build-jar</id>
-            <phase>package</phase>
-            <goals>
-              <goal>single</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <version>3.2</version>
-      </plugin>
-      <plugin>
-        <groupId>com.github.goldin</groupId>
-        <artifactId>copy-maven-plugin</artifactId>
-        <version>0.2.5</version>
-        <executions>
-          <execution>
-            <id>create-archive</id>
-            <phase>none</phase>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.vafer</groupId>
-        <artifactId>jdeb</artifactId>
-        <executions>
-          <execution>
-            <!--Stub execution on direct plugin call - workaround for ambari 
deb build process-->
-            <id>stub-execution</id>
-            <phase>none</phase>
-            <goals>
-              <goal>jdeb</goal>
-            </goals>
-          </execution>
-        </executions>
-        <configuration>
-          <skip>true</skip>
-          <attach>false</attach>
-          <submodules>false</submodules>
-          
<controlDir>${project.basedir}/../src/main/package/deb/control</controlDir>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-core</artifactId>
-      <version>1.5.1</version>
-      <scope>compile</scope>
-      <exclusions>
-        <exclusion>
-          <artifactId>libthrift</artifactId>
-          <groupId>org.apache.thrift</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>jetty-util</artifactId>
-          <groupId>org.mortbay.jetty</groupId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.ambari</groupId>
-      <artifactId>ambari-metrics-common</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.codehaus.jackson</groupId>
-      <artifactId>jackson-mapper-asl</artifactId>
-      <version>1.9.13</version>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-      <version>4.10</version>
-    </dependency>
-    <dependency>
-      <groupId>org.easymock</groupId>
-      <artifactId>easymock</artifactId>
-      <version>3.2</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.powermock</groupId>
-      <artifactId>powermock-api-easymock</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.powermock</groupId>
-      <artifactId>powermock-module-junit4</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <version>18.0</version>
-    </dependency>
-  </dependencies>
-</project>
diff --git a/ambari-metrics-flume-sink/src/main/assemblies/empty.xml 
b/ambari-metrics-flume-sink/src/main/assemblies/empty.xml
deleted file mode 100644
index 35738b1..0000000
--- a/ambari-metrics-flume-sink/src/main/assemblies/empty.xml
+++ /dev/null
@@ -1,21 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-  
-       http://www.apache.org/licenses/LICENSE-2.0
-  
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<assembly>
-    <id>empty</id>
-    <formats/>
-</assembly>
diff --git a/ambari-metrics-flume-sink/src/main/assemblies/jar-with-common.xml 
b/ambari-metrics-flume-sink/src/main/assemblies/jar-with-common.xml
deleted file mode 100644
index bfd8b29..0000000
--- a/ambari-metrics-flume-sink/src/main/assemblies/jar-with-common.xml
+++ /dev/null
@@ -1,35 +0,0 @@
-<?xml version="1.0"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-<assembly>
-  <id>jar-with-common</id>
-  <formats>
-    <format>jar</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-  <dependencySets>
-    <dependencySet>
-      <fileMode>644</fileMode>
-      <outputDirectory>/</outputDirectory>
-      <unpack>true</unpack>
-      <includes>
-        <include>org.apache.ambari:ambari-metrics-common</include>
-        <include>org.apache.ambari:ambari-metrics-flume-sink</include>
-      </includes>
-    </dependencySet>
-  </dependencySets>
-</assembly>
diff --git 
a/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2 
b/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
deleted file mode 100644
index 58c5f09..0000000
--- a/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
+++ /dev/null
@@ -1,31 +0,0 @@
-{#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#}
-
-collector=http://localhost:6188
-collectionFrequency=60000
-maxRowCacheSize=10000
-sendInterval={{metrics_report_interval}}000
-clusterReporterAppId=nimbus
-host_in_memory_aggregation = {{host_in_memory_aggregation}}
-host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
-{% if is_aggregation_https_enabled %}
-host_in_memory_aggregation_protocol = {{host_in_memory_aggregation_protocol}}
-{% endif %}
-
-# Metric names having type COUNTER
-counters=EventTakeSuccessCount,EventPutSuccessCount,EventTakeAttemptCount,EventPutAttemptCount
diff --git 
a/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
 
b/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
deleted file mode 100644
index 720c371..0000000
--- 
a/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.flume;
-
-import org.apache.commons.lang.math.NumberUtils;
-import org.apache.flume.Context;
-import org.apache.flume.FlumeException;
-import org.apache.flume.instrumentation.MonitorService;
-import org.apache.flume.instrumentation.util.JMXPollUtil;
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
-import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink 
implements MonitorService {
-  private String collectorUri;
-  private String protocol;
-  // Key - component(instance_id)
-  private Map<String, TimelineMetricsCache> metricsCaches;
-  private int maxRowCacheSize;
-  private int metricsSendInterval;
-  private ScheduledExecutorService scheduledExecutorService;
-  private long pollFrequency;
-  private String hostname;
-  private String port;
-  private Collection<String> collectorHosts;
-  private String zookeeperQuorum;
-  private final static String COUNTER_METRICS_PROPERTY = "counters";
-  private final Set<String> counterMetrics = new HashSet<String>();
-  private int timeoutSeconds = 10;
-  private boolean setInstanceId;
-  private String instanceId;
-  private boolean hostInMemoryAggregationEnabled;
-  private int hostInMemoryAggregationPort;
-  private String hostInMemoryAggregationProtocol;
-
-
-  @Override
-  public void start() {
-    LOG.info("Starting Flume Metrics Sink");
-    TimelineMetricsCollector timelineMetricsCollector = new 
TimelineMetricsCollector();
-    if (scheduledExecutorService == null || 
scheduledExecutorService.isShutdown() || 
scheduledExecutorService.isTerminated()) {
-      scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
-    }
-    scheduledExecutorService.scheduleWithFixedDelay(timelineMetricsCollector, 
0,
-        pollFrequency, TimeUnit.MILLISECONDS);
-  }
-
-  @Override
-  public void stop() {
-    LOG.info("Stopping Flume Metrics Sink");
-    scheduledExecutorService.shutdown();
-  }
-
-  @Override
-  public void configure(Context context) {
-    LOG.info("Context parameters " + context);
-    try {
-      hostname = InetAddress.getLocalHost().getHostName();
-      //If not FQDN , call  DNS
-      if ((hostname == null) || (!hostname.contains("."))) {
-        hostname = InetAddress.getLocalHost().getCanonicalHostName();
-      }
-      hostname = hostname.toLowerCase();
-
-    } catch (UnknownHostException e) {
-      LOG.error("Could not identify hostname.");
-      throw new FlumeException("Could not identify hostname.", e);
-    }
-    Configuration configuration = new 
Configuration("/flume-metrics2.properties");
-    timeoutSeconds = 
Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS,
-        String.valueOf(DEFAULT_POST_TIMEOUT_SECONDS)));
-    maxRowCacheSize = 
Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
-        String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
-    metricsSendInterval = 
Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
-        String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
-    metricsCaches = new HashMap<String, TimelineMetricsCache>();
-    collectorHosts = 
parseHostsStringIntoCollection(configuration.getProperty(COLLECTOR_HOSTS_PROPERTY));
-    zookeeperQuorum = configuration.getProperty("zookeeper.quorum");
-    protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
-    port = configuration.getProperty(COLLECTOR_PORT, "6188");
-    setInstanceId = 
Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
-    instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, "");
-
-    hostInMemoryAggregationEnabled = 
Boolean.getBoolean(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY,
 "false"));
-    hostInMemoryAggregationPort = 
Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY,
 "61888"));
-    hostInMemoryAggregationProtocol = 
configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
-    // Initialize the collector write strategy
-    super.init();
-
-    if (protocol.contains("https") || 
hostInMemoryAggregationProtocol.contains("https")) {
-      String trustStorePath = 
configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
-      String trustStoreType = 
configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
-      String trustStorePwd = 
configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
-      loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
-    }
-    collectorUri = constructTimelineMetricUri(protocol, 
findPreferredCollectHost(), port);
-
-    pollFrequency = 
Long.parseLong(configuration.getProperty("collectionFrequency"));
-
-    String[] metrics = 
configuration.getProperty(COUNTER_METRICS_PROPERTY).trim().split(",");
-    Collections.addAll(counterMetrics, metrics);
-  }
-
-  @Override
-  public String getCollectorUri(String host) {
-    return constructTimelineMetricUri(protocol, host, port);
-  }
-
-  @Override
-  protected String getCollectorProtocol() {
-    return protocol;
-  }
-
-  @Override
-  protected String getCollectorPort() {
-    return port;
-  }
-
-  @Override
-  protected int getTimeoutSeconds() {
-    return timeoutSeconds;
-  }
-
-  @Override
-  protected String getZookeeperQuorum() {
-    return zookeeperQuorum;
-  }
-
-  @Override
-  protected Collection<String> getConfiguredCollectorHosts() {
-    return collectorHosts;
-  }
-
-  @Override
-  protected String getHostname() {
-    return hostname;
-  }
-
-  @Override
-  protected boolean isHostInMemoryAggregationEnabled() {
-    return hostInMemoryAggregationEnabled;
-  }
-
-  @Override
-  protected int getHostInMemoryAggregationPort() {
-    return hostInMemoryAggregationPort;
-  }
-
-  @Override
-  protected String getHostInMemoryAggregationProtocol() {
-    return hostInMemoryAggregationProtocol;
-  }
-
-  public void setPollFrequency(long pollFrequency) {
-    this.pollFrequency = pollFrequency;
-  }
-
-  //Test hepler method
-  protected void setMetricsCaches(Map<String, TimelineMetricsCache> 
metricsCaches) {
-    this.metricsCaches = metricsCaches;
-  }
-
-  /**
-   * Worker which polls JMX for all mbeans with
-   * {@link javax.management.ObjectName} within the flume namespace:
-   * org.apache.flume. All attributes of such beans are sent
-   * to the metrics collector service.
-   */
-  class TimelineMetricsCollector implements Runnable {
-    @Override
-    public void run() {
-      LOG.debug("Collecting Metrics for Flume");
-      try {
-        Map<String, Map<String, String>> metricsMap = 
JMXPollUtil.getAllMBeans();
-        long currentTimeMillis = System.currentTimeMillis();
-        for (String component : metricsMap.keySet()) {
-          Map<String, String> attributeMap = metricsMap.get(component);
-          LOG.debug("Attributes for component " + component);
-          processComponentAttributes(currentTimeMillis, component, 
attributeMap);
-        }
-      } catch (UnableToConnectException uce) {
-        LOG.warn("Unable to send metrics to collector by address:" + 
uce.getConnectUrl());
-      } catch (Exception e) {
-        LOG.error("Unexpected error", e);
-      }
-      LOG.debug("Finished collecting Metrics for Flume");
-    }
-
-    private void processComponentAttributes(long currentTimeMillis, String 
component, Map<String, String> attributeMap) throws IOException {
-      List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
-      if (!metricsCaches.containsKey(component)) {
-        metricsCaches.put(component, new TimelineMetricsCache(maxRowCacheSize, 
metricsSendInterval));
-      }
-      TimelineMetricsCache metricsCache = metricsCaches.get(component);
-      for (String attributeName : attributeMap.keySet()) {
-        String attributeValue = attributeMap.get(attributeName);
-        if (NumberUtils.isNumber(attributeValue)) {
-          LOG.info(attributeName + " = " + attributeValue);
-          TimelineMetric timelineMetric = 
createTimelineMetric(currentTimeMillis,
-              component, attributeName, attributeValue);
-          // Put intermediate values into the cache until it is time to send
-          metricsCache.putTimelineMetric(timelineMetric, 
isCounterMetric(attributeName));
-
-          TimelineMetric cachedMetric = 
metricsCache.getTimelineMetric(attributeName);
-
-          if (cachedMetric != null) {
-            metricList.add(cachedMetric);
-          }
-        }
-      }
-
-      if (!metricList.isEmpty()) {
-        TimelineMetrics timelineMetrics = new TimelineMetrics();
-        timelineMetrics.setMetrics(metricList);
-        emitMetrics(timelineMetrics);
-      }
-    }
-
-    private TimelineMetric createTimelineMetric(long currentTimeMillis, String 
component, String attributeName, String attributeValue) {
-      TimelineMetric timelineMetric = new TimelineMetric();
-      timelineMetric.setMetricName(attributeName);
-      timelineMetric.setHostName(hostname);
-      if (setInstanceId) {
-        timelineMetric.setInstanceId(instanceId + component);
-      } else {
-        timelineMetric.setInstanceId(component);
-      }
-      timelineMetric.setAppId("FLUME_HANDLER");
-      timelineMetric.setStartTime(currentTimeMillis);
-      timelineMetric.getMetricValues().put(currentTimeMillis, 
Double.parseDouble(attributeValue));
-      return timelineMetric;
-    }
-  }
-
-  private boolean isCounterMetric(String attributeName) {
-    return counterMetrics.contains(attributeName);
-  }
-}
diff --git 
a/ambari-metrics-flume-sink/src/test/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java
 
b/ambari-metrics-flume-sink/src/test/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java
deleted file mode 100644
index 99da43f..0000000
--- 
a/ambari-metrics-flume-sink/src/test/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSinkTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.flume;
-
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replay;
-import static org.powermock.api.easymock.PowerMock.resetAll;
-import static org.powermock.api.easymock.PowerMock.verifyAll;
-
-import java.net.InetAddress;
-import java.util.Collections;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flume.Context;
-import org.apache.flume.instrumentation.util.JMXPollUtil;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
-import org.easymock.EasyMock;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({JMXPollUtil.class, Executors.class, 
FlumeTimelineMetricsSink.class})
-public class FlumeTimelineMetricsSinkTest {
-  @Test
-  public void testNonNumericMetricMetricExclusion() throws 
InterruptedException {
-    FlumeTimelineMetricsSink flumeTimelineMetricsSink = new 
FlumeTimelineMetricsSink();
-    FlumeTimelineMetricsSink.TimelineMetricsCollector collector =
-      flumeTimelineMetricsSink.new TimelineMetricsCollector();
-    mockStatic(JMXPollUtil.class);
-    EasyMock.expect(JMXPollUtil.getAllMBeans()).andReturn(
-        Collections.singletonMap("component1", 
Collections.singletonMap("key1", "value1"))).once();
-    replay(JMXPollUtil.class);
-    collector.run();
-    verifyAll();
-  }
-
-  @Test
-  public void testNumericMetricSubmission() throws InterruptedException {
-    FlumeTimelineMetricsSink flumeTimelineMetricsSink = new 
FlumeTimelineMetricsSink();
-    FlumeTimelineMetricsSink.TimelineMetricsCollector collector =
-      flumeTimelineMetricsSink.new TimelineMetricsCollector();
-    mockStatic(JMXPollUtil.class);
-    EasyMock.expect(JMXPollUtil.getAllMBeans()).andReturn(
-        Collections.singletonMap("component1", 
Collections.singletonMap("key1", "42"))).once();
-    replay(JMXPollUtil.class);
-    collector.run();
-    verifyAll();
-  }
-
-  private TimelineMetricsCache 
getTimelineMetricsCache(FlumeTimelineMetricsSink flumeTimelineMetricsSink) {
-    TimelineMetricsCache timelineMetricsCache = 
EasyMock.createNiceMock(TimelineMetricsCache.class);
-    
flumeTimelineMetricsSink.setMetricsCaches(Collections.singletonMap("SINK",timelineMetricsCache));
-    EasyMock.expect(timelineMetricsCache.getTimelineMetric("key1"))
-        .andReturn(new TimelineMetric()).once();
-    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
-    EasyMock.expectLastCall().once();
-    return timelineMetricsCache;
-  }
-
-  @Test
-  public void testMonitorRestart() throws InterruptedException {
-    FlumeTimelineMetricsSink flumeTimelineMetricsSink = new 
FlumeTimelineMetricsSink();
-    TimelineMetricsCache timelineMetricsCache = 
getTimelineMetricsCache(flumeTimelineMetricsSink);
-    flumeTimelineMetricsSink.setPollFrequency(1);
-    mockStatic(Executors.class);
-    ScheduledExecutorService executor = 
createNiceMock(ScheduledExecutorService.class);
-    expect(Executors.newSingleThreadScheduledExecutor()).andReturn(executor);
-    FlumeTimelineMetricsSink.TimelineMetricsCollector collector = anyObject();
-    TimeUnit unit = anyObject();
-    expect(executor.scheduleWithFixedDelay(collector, eq(0), eq(1), 
unit)).andReturn(null);
-    executor.shutdown();
-    replay(timelineMetricsCache, Executors.class, executor);
-
-    flumeTimelineMetricsSink.start();
-    flumeTimelineMetricsSink.stop();
-
-    verifyAll();
-  }
-
-  @Test
-  public void testMetricsRetrievalExceptionTolerance() throws 
InterruptedException {
-    FlumeTimelineMetricsSink flumeTimelineMetricsSink = new 
FlumeTimelineMetricsSink();
-    FlumeTimelineMetricsSink.TimelineMetricsCollector collector =
-      flumeTimelineMetricsSink.new TimelineMetricsCollector();
-    mockStatic(JMXPollUtil.class);
-    EasyMock.expect(JMXPollUtil.getAllMBeans()).
-        andThrow(new RuntimeException("Failed to retrieve Flume 
Properties")).once();
-    replay(JMXPollUtil.class);
-    collector.run();
-    verifyAll();
-  }
-
-  @Test
-  @PrepareForTest({Configuration.class, FlumeTimelineMetricsSink.class})
-  public void testGettingFqdn() throws Exception {
-    FlumeTimelineMetricsSink flumeTimelineMetricsSink = new 
FlumeTimelineMetricsSink();
-    Configuration config = createNiceMock(Configuration.class);
-
-    expect(config.getProperty(anyString(), anyString()))
-      .andReturn("60")
-      .anyTimes();
-    expect(config.getProperty(anyString()))
-      .andReturn("60")
-      .anyTimes();
-    replay(config);
-
-    PowerMock.expectNew(Configuration.class, anyString())
-      .andReturn(config);
-    replay(Configuration.class);
-
-    // getHostName() returned FQDN
-    InetAddress address = createNiceMock(InetAddress.class);
-    expect(address.getHostName()).andReturn("hostname.domain").once();
-    replay(address);
-
-    mockStatic(InetAddress.class);
-    expect(InetAddress.getLocalHost()).andReturn(address).once();
-    replay(InetAddress.class);
-
-    flumeTimelineMetricsSink.configure(new Context());
-    verifyAll();
-
-    resetAll();
-
-    PowerMock.expectNew(Configuration.class, anyString())
-      .andReturn(config);
-    replay(Configuration.class);
-
-    // getHostName() returned short hostname, getCanonicalHostName() called
-    address = createNiceMock(InetAddress.class);
-    expect(address.getHostName()).andReturn("hostname").once();
-    expect(address.getCanonicalHostName()).andReturn("hostname.domain").once();
-    replay(address);
-
-    mockStatic(InetAddress.class);
-    expect(InetAddress.getLocalHost()).andReturn(address).times(2);
-    replay(InetAddress.class);
-
-    flumeTimelineMetricsSink.configure(new Context());
-    verifyAll();
-
-  }
-
-}
diff --git a/pom.xml b/pom.xml
index eed8204..35ca644 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,7 +24,6 @@
   <modules>
     <module>ambari-metrics-common</module>
     <module>ambari-metrics-hadoop-sink</module>
-    <module>ambari-metrics-flume-sink</module>
     <module>ambari-metrics-kafka-sink</module>
     <module>ambari-metrics-storm-sink</module>
     <module>ambari-metrics-timelineservice</module>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to