Repository: hadoop Updated Branches: refs/heads/trunk 2fc2b5053 -> b59e43411
HADOOP-10949. metrics2 sink plugin for Apache Kafka (Babak Behzad via aw) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b59e4341 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b59e4341 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b59e4341 Branch: refs/heads/trunk Commit: b59e434116a54464efa91c6e81dcd2eb94fdf1f8 Parents: 2fc2b50 Author: Allen Wittenauer <a...@apache.org> Authored: Mon Oct 5 14:02:07 2015 -0700 Committer: Allen Wittenauer <a...@apache.org> Committed: Mon Oct 5 14:02:07 2015 -0700 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 2 + hadoop-project/pom.xml | 3 +- hadoop-tools/hadoop-kafka/pom.xml | 129 +++++++++++++ .../apache/hadoop/metrics2/sink/KafkaSink.java | 193 +++++++++++++++++++ .../hadoop/metrics2/impl/TestKafkaMetrics.java | 184 ++++++++++++++++++ hadoop-tools/pom.xml | 1 + 6 files changed, 511 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b59e4341/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 72d4a47..04725ba 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -56,6 +56,8 @@ Trunk (Unreleased) HADOOP-10854. unit tests for the shell scripts (aw) + HADOOP-10949. metrics2 sink plugin for Apache Kafka (Babak Behzad via aw) + IMPROVEMENTS HADOOP-11203. Allow ditscp to accept bandwitdh in fraction MegaBytes http://git-wip-us.apache.org/repos/asf/hadoop/blob/b59e4341/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index fd33246..a62e45b 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -45,8 +45,9 @@ <!-- Version number for xerces used by JDiff --> <xerces.jdiff.version>2.11.0</xerces.jdiff.version> - <hadoop.assemblies.version>${project.version}</hadoop.assemblies.version> + <kafka.version>0.8.2.1</kafka.version> + <hadoop.assemblies.version>${project.version}</hadoop.assemblies.version> <commons-daemon.version>1.0.13</commons-daemon.version> <test.build.dir>${project.build.directory}/test-dir</test.build.dir> http://git-wip-us.apache.org/repos/asf/hadoop/blob/b59e4341/hadoop-tools/hadoop-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-kafka/pom.xml b/hadoop-tools/hadoop-kafka/pom.xml new file mode 100644 index 0000000..75405e1 --- /dev/null +++ b/hadoop-tools/hadoop-kafka/pom.xml @@ -0,0 +1,129 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-project</artifactId> + <version>3.0.0-SNAPSHOT</version> + <relativePath>../../hadoop-project</relativePath> + </parent> + <artifactId>hadoop-kafka</artifactId> + <version>3.0.0-SNAPSHOT</version> + <name>Apache Hadoop Kafka Library support</name> + <description> + This module contains code to support integration with Kafka. + It also declares the dependencies needed to work with Kafka. + </description> + <packaging>jar</packaging> + + <properties> + <file.encoding>UTF-8</file.encoding> + <downloadSources>true</downloadSources> + </properties> + + <profiles> + <profile> + <id>tests-off</id> + <activation> + <file> + <missing>src/test/resources/auth-keys.xml</missing> + </file> + </activation> + <properties> + <maven.test.skip>true</maven.test.skip> + </properties> + </profile> + <profile> + <id>tests-on</id> + <activation> + <file> + <exists>src/test/resources/auth-keys.xml</exists> + </file> + </activation> + <properties> + <maven.test.skip>false</maven.test.skip> + </properties> + </profile> + + </profiles> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <configuration> + <findbugsXmlOutput>true</findbugsXmlOutput> + <xmlOutput>true</xmlOutput> + <excludeFilterFile>${basedir}/dev-support/findbugs-exclude.xml + </excludeFilterFile> + <effort>Max</effort> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-project-info-reports-plugin</artifactId> + <configuration> + <dependencyDetailsEnabled>false</dependencyDetailsEnabled> + <dependencyLocationsEnabled>false</dependencyLocationsEnabled> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds> + </configuration> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <!-- see ../../hadoop-project/pom.xml for versions --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>${kafka.version}</version> + <exclusions> + <exclusion> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/b59e4341/hadoop-tools/hadoop-kafka/src/main/java/org/apache/hadoop/metrics2/sink/KafkaSink.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-kafka/src/main/java/org/apache/hadoop/metrics2/sink/KafkaSink.java b/hadoop-tools/hadoop-kafka/src/main/java/org/apache/hadoop/metrics2/sink/KafkaSink.java new file mode 100644 index 0000000..d8e5bd8 --- /dev/null +++ b/hadoop-tools/hadoop-kafka/src/main/java/org/apache/hadoop/metrics2/sink/KafkaSink.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink; + +import com.google.common.base.Strings; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.commons.configuration.SubsetConfiguration; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsException; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsSink; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetAddress; +import java.nio.charset.Charset; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * A metrics sink that writes to a Kafka broker. This requires you to configure + * a broker_list and a topic in the metrics2 configuration file. The broker_list + * must contain a comma-separated list of kafka broker host and ports. The topic + * will contain only one topic. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class KafkaSink implements MetricsSink, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class); + public static final String BROKER_LIST = "broker_list"; + public static final String TOPIC = "topic"; + + private String hostname = null; + private String brokerList = null; + private String topic = null; + private Producer<Integer, byte[]> producer = null; + + public void setProducer(Producer<Integer, byte[]> p) { + this.producer = p; + } + + @Override + public void init(SubsetConfiguration conf) { + // Get Kafka broker configuration. + Properties props = new Properties(); + brokerList = conf.getString(BROKER_LIST); + if (LOG.isDebugEnabled()) { + LOG.debug("Broker list " + brokerList); + } + props.put("bootstrap.servers", brokerList); + if (LOG.isDebugEnabled()) { + LOG.debug("Kafka brokers: " + brokerList); + } + + // Get Kafka topic configuration. + topic = conf.getString(TOPIC); + if (LOG.isDebugEnabled()) { + LOG.debug("Kafka topic " + topic); + } + if (Strings.isNullOrEmpty(topic)) { + throw new MetricsException("Kafka topic can not be null"); + } + + // Set the rest of Kafka configuration. + props.put("key.serializer", + "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("value.serializer", + "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("request.required.acks", "0"); + + // Set the hostname once and use it in every message. + hostname = "null"; + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (Exception e) { + LOG.warn("Error getting Hostname, going to continue"); + } + + try { + // Create the producer object. + producer = new KafkaProducer<Integer, byte[]>(props); + } catch (Exception e) { + throw new MetricsException("Error creating Producer, " + brokerList, e); + } + } + + @Override + public void putMetrics(MetricsRecord record) { + + if (producer == null) { + throw new MetricsException("Producer in KafkaSink is null!"); + } + + // Create the json object. + StringBuilder jsonLines = new StringBuilder(); + + Long timestamp = record.timestamp(); + Date currDate = new Date(timestamp); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + SimpleDateFormat timeFormat = new SimpleDateFormat("hh:mm:ss"); + String date = dateFormat.format(currDate); + String time = timeFormat.format(currDate); + + // Collect datapoints and populate the json object. + jsonLines.append("{\"hostname\": \"" + hostname); + jsonLines.append("\", \"timestamp\": " + timestamp); + jsonLines.append(", \"date\": \"" + date); + jsonLines.append("\",\"time\": \"" + time); + jsonLines.append("\",\"name\": \"" + record.name() + "\" "); + for (MetricsTag tag : record.tags()) { + jsonLines.append( + ", \"" + tag.name().toString().replaceAll("[\\p{Cc}]", "") + "\": "); + jsonLines.append(" \"" + tag.value().toString() + "\""); + } + for (AbstractMetric metric : record.metrics()) { + jsonLines.append(", \"" + + metric.name().toString().replaceAll("[\\p{Cc}]", "") + "\": "); + jsonLines.append(" \"" + metric.value().toString() + "\""); + } + jsonLines.append("}"); + LOG.debug("kafka message: " + jsonLines.toString()); + + // Create the record to be sent from the json. + ProducerRecord<Integer, byte[]> data = new ProducerRecord<Integer, byte[]>( + topic, jsonLines.toString().getBytes(Charset.forName("UTF-8"))); + + // Send the data to the Kafka broker. Here is an example of this data: + // {"hostname": "...", "timestamp": 1436913651516, + // "date": "2015-6-14","time": "22:40:51","context": "yarn","name": + // "QueueMetrics, "running_0": "1", "running_60": "0", "running_300": "0", + // "running_1440": "0", "AppsSubmitted": "1", "AppsRunning": "1", + // "AppsPending": "0", "AppsCompleted": "0", "AppsKilled": "0", + // "AppsFailed": "0", "AllocatedMB": "134656", "AllocatedVCores": "132", + // "AllocatedContainers": "132", "AggregateContainersAllocated": "132", + // "AggregateContainersReleased": "0", "AvailableMB": "0", + // "AvailableVCores": "0", "PendingMB": "275456", "PendingVCores": "269", + // "PendingContainers": "269", "ReservedMB": "0", "ReservedVCores": "0", + // "ReservedContainers": "0", "ActiveUsers": "1", "ActiveApplications": "1"} + Future<RecordMetadata> future = producer.send(data); + jsonLines.setLength(0); + try { + future.get(); + } catch (InterruptedException e) { + throw new MetricsException("Error sending data", e); + } catch (ExecutionException e) { + throw new MetricsException("Error sending data", e); + } + } + + @Override + public void flush() { + LOG.debug("Kafka seems not to have any flush() mechanism!"); + } + + @Override + public void close() throws IOException { + // Close the producer and set it to null. + try { + producer.close(); + } catch (RuntimeException e) { + throw new MetricsException("Error closing producer", e); + } finally { + producer = null; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b59e4341/hadoop-tools/hadoop-kafka/src/test/java/org/apache/hadoop/metrics2/impl/TestKafkaMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-kafka/src/test/java/org/apache/hadoop/metrics2/impl/TestKafkaMetrics.java b/hadoop-tools/hadoop-kafka/src/test/java/org/apache/hadoop/metrics2/impl/TestKafkaMetrics.java new file mode 100644 index 0000000..3a65868 --- /dev/null +++ b/hadoop-tools/hadoop-kafka/src/test/java/org/apache/hadoop/metrics2/impl/TestKafkaMetrics.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.impl; + +import com.google.common.base.Objects; +import com.google.common.collect.Lists; +import org.apache.commons.configuration.SubsetConfiguration; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricType; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.MetricsVisitor; +import org.apache.hadoop.metrics2.sink.KafkaSink; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * This tests that the KafkaSink properly formats the Kafka message. + */ +public class TestKafkaMetrics { + private static final Logger LOG = + LoggerFactory.getLogger(TestKafkaMetrics.class); + private KafkaSink kafkaSink; + + enum KafkaMetricsInfo implements MetricsInfo { + KafkaMetrics("Kafka related metrics etc."), KafkaCounter( + "Kafka counter."), KafkaTag("Kafka tag."); + // metrics + + private final String desc; + + KafkaMetricsInfo(String desc) { + this.desc = desc; + } + + @Override + public String description() { + return desc; + } + + @Override + public String toString() { + return Objects.toStringHelper(this).add("name", name()) + .add("description", desc).toString(); + } + } + + @Test + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void testPutMetrics() throws Exception { + // Create a record by mocking MetricsRecord class. + MetricsRecord record = mock(MetricsRecord.class); + when(record.tags()).thenReturn(Lists + .newArrayList(new MetricsTag(KafkaMetricsInfo.KafkaTag, "test_tag"))); + when(record.timestamp()).thenReturn(System.currentTimeMillis()); + + // Create a metric using AbstractMetric class. + AbstractMetric metric = new AbstractMetric(KafkaMetricsInfo.KafkaCounter) { + @Override + public Number value() { + return new Integer(123); + } + + @Override + public MetricType type() { + return null; + } + + @Override + public void visit(MetricsVisitor visitor) { + + } + }; + + // Create a list of metrics. + Iterable<AbstractMetric> metrics = Lists.newArrayList(metric); + when(record.name()).thenReturn("Kafka record name"); + when(record.metrics()).thenReturn(metrics); + SubsetConfiguration conf = mock(SubsetConfiguration.class); + when(conf.getString(KafkaSink.BROKER_LIST)).thenReturn("localhost:9092"); + String topic = "myTestKafkaTopic"; + when(conf.getString(KafkaSink.TOPIC)).thenReturn(topic); + + // Create the KafkaSink object and initialize it. + kafkaSink = new KafkaSink(); + kafkaSink.init(conf); + + // Create a mock KafkaProducer as a producer for KafkaSink. + Producer<Integer, byte[]> mockProducer = mock(KafkaProducer.class); + kafkaSink.setProducer(mockProducer); + + // Create the json object from the record. + StringBuilder jsonLines = recordToJson(record); + if (LOG.isDebugEnabled()) { + LOG.debug("kafka message: " + jsonLines.toString()); + } + + // Send the record and store the result in a mock Future. + Future<RecordMetadata> f = mock(Future.class); + when(mockProducer.send((ProducerRecord) anyObject())).thenReturn(f); + kafkaSink.putMetrics(record); + + // Get the argument and verity it. + ArgumentCaptor<ProducerRecord> argument = + ArgumentCaptor.forClass(ProducerRecord.class); + verify(mockProducer).send(argument.capture()); + + // Compare the received data with the original one. + ProducerRecord<Integer, byte[]> data = (argument.getValue()); + String jsonResult = new String(data.value()); + if (LOG.isDebugEnabled()) { + LOG.debug("kafka result: " + jsonResult); + } + assertEquals(jsonLines.toString(), jsonResult); + } + + StringBuilder recordToJson(MetricsRecord record) { + // Create a json object from a metrics record. + StringBuilder jsonLines = new StringBuilder(); + Long timestamp = record.timestamp(); + Date currDate = new Date(timestamp); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + String date = dateFormat.format(currDate); + SimpleDateFormat timeFormat = new SimpleDateFormat("hh:mm:ss"); + String time = timeFormat.format(currDate); + String hostname = new String("null"); + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (Exception e) { + LOG.warn("Error getting Hostname, going to continue"); + } + jsonLines.append("{\"hostname\": \"" + hostname); + jsonLines.append("\", \"timestamp\": " + timestamp); + jsonLines.append(", \"date\": \"" + date); + jsonLines.append("\",\"time\": \"" + time); + jsonLines.append("\",\"name\": \"" + record.name() + "\" "); + for (MetricsTag tag : record.tags()) { + jsonLines.append( + ", \"" + tag.name().toString().replaceAll("[\\p{Cc}]", "") + "\": "); + jsonLines.append(" \"" + tag.value().toString() + "\""); + } + for (AbstractMetric m : record.metrics()) { + jsonLines.append( + ", \"" + m.name().toString().replaceAll("[\\p{Cc}]", "") + "\": "); + jsonLines.append(" \"" + m.value().toString() + "\""); + } + jsonLines.append("}"); + return jsonLines; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b59e4341/hadoop-tools/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/pom.xml b/hadoop-tools/pom.xml index 0061bf0..9f7e590 100644 --- a/hadoop-tools/pom.xml +++ b/hadoop-tools/pom.xml @@ -46,6 +46,7 @@ <module>hadoop-sls</module> <module>hadoop-azure</module> <module>hadoop-aws</module> + <module>hadoop-kafka</module> </modules> <build>