Repository: flume Updated Branches: refs/heads/flume-1.6 25f331d0e -> 02973a98a
FLUME-2499. Include Kafka Message Key in Event Headers. (Ricky Saltzer via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/02973a98 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/02973a98 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/02973a98 Branch: refs/heads/flume-1.6 Commit: 02973a98a223c4b72126b6db3b1b6c1dbdc9239a Parents: 25f331d Author: Hari Shreedharan <[email protected]> Authored: Wed Oct 15 22:20:34 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Wed Oct 15 22:20:34 2014 -0700 ---------------------------------------------------------------------- .../apache/flume/source/kafka/KafkaSource.java | 67 ++++++++++++-------- .../source/kafka/KafkaSourceConstants.java | 1 + 2 files changed, 41 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/02973a98/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index 9d77b47..7bc03da 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -27,6 +27,7 @@ import kafka.consumer.ConsumerTimeoutException; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.MessageAndMetadata; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurationException; @@ -38,23 +39,30 @@ import org.slf4j.LoggerFactory; /** - * A Source for Kafka which reads messages from kafka. - * I use this in company production environment and its performance is good. - * Over 100k messages per second can be read from kafka in one source.<p> - * <tt>kafka.zookeeper.connect: </tt> the zookeeper ip kafka use.<p> - * <tt>kafka.group.id: </tt> the groupid of consumer group.<p> - * <tt>topic: </tt> the topic to read from kafka.<p> - * maxBatchSize - maximum number of messages written to Channel in one batch - * maxBatchDurationMillis - maximum time before a batch (of any size) - * will be written to Channel - * kafka.auto.commit.enable - if true, commit automatically every time period. - * if false, commit on each batch. - * kafka.consumer.timeout.ms - polling interval for new data for batch. - * Low value means more CPU usage. - * High value means the time.upper.limit may be missed. + * A Source for Kafka which reads messages from a kafka topic. * - * Any property starting with "kafka" will be passed to the kafka consumer - * So you can use any configuration supported by Kafka 0.8.1.1 + * <tt>zookeeperConnect: </tt> Kafka's zookeeper connection string. + * <b>Required</b> + * <p> + * <tt>groupId: </tt> the group ID of consumer group. <b>Required</b> + * <p> + * <tt>topic: </tt> the topic to consume messages from. <b>Required</b> + * <p> + * <tt>maxBatchSize: </tt> Maximum number of messages written to Channel in one + * batch. Default: 1000 + * <p> + * <tt>maxBatchDurationMillis: </tt> Maximum number of milliseconds before a + * batch (of any size) will be written to a channel. Default: 1000 + * <p> + * <tt>kafka.auto.commit.enable: </tt> If true, commit automatically every time + * period. if false, commit on each batch. Default: false + * <p> + * <tt>kafka.consumer.timeout.ms: </tt> Polling interval for new data for batch. + * Low value means more CPU usage. High value means the time.upper.limit may be + * missed. Default: 10 + * + * Any property starting with "kafka" will be passed to the kafka consumer So + * you can use any configuration supported by Kafka 0.8.1.1 */ public class KafkaSource extends AbstractSource implements Configurable, PollableSource { @@ -72,7 +80,8 @@ public class KafkaSource extends AbstractSource public Status process() throws EventDeliveryException { - byte[] bytes; + byte[] kafkaMessage; + byte[] kafkaKey; Event event; Map<String, String> headers; long batchStartTime = System.currentTimeMillis(); @@ -84,16 +93,20 @@ public class KafkaSource extends AbstractSource iterStatus = hasNext(); if (iterStatus) { // get next message - bytes = it.next().message(); + MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next(); + kafkaMessage = messageAndMetadata.message(); + kafkaKey = messageAndMetadata.key(); + // Add headers to event (topic, timestamp, and key) headers = new HashMap<String, String>(); headers.put(KafkaSourceConstants.TIMESTAMP, String.valueOf(System.currentTimeMillis())); - headers.put(KafkaSourceConstants.TOPIC,topic); + headers.put(KafkaSourceConstants.TOPIC, topic); + headers.put(KafkaSourceConstants.KEY, new String(kafkaKey)); if (log.isDebugEnabled()) { - log.debug("Message: {}", new String(bytes)); + log.debug("Message: {}", new String(kafkaMessage)); } - event = EventBuilder.withBody(bytes, headers); + event = EventBuilder.withBody(kafkaMessage, headers); eventList.add(event); } if (log.isDebugEnabled()) { @@ -132,12 +145,12 @@ public class KafkaSource extends AbstractSource * We configure the source and generate properties for the Kafka Consumer * * Kafka Consumer properties are generated as follows: - * 1. Generate a properties object with some static defaults that - * can be overridden by Source configuration - * 2. We add the configuration users added for Kafka (parameters starting - * with kafka. and must be valid Kafka Consumer properties - * 3. We add the source documented parameters which can override other - * properties + * + * 1. Generate a properties object with some static defaults that can be + * overridden by Source configuration 2. We add the configuration users added + * for Kafka (parameters starting with kafka. and must be valid Kafka Consumer + * properties 3. We add the source documented parameters which can override + * other properties * * @param context */ http://git-wip-us.apache.org/repos/asf/flume/blob/02973a98/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java index 7390618..911012c 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java @@ -18,6 +18,7 @@ package org.apache.flume.source.kafka; public class KafkaSourceConstants { public static final String TOPIC = "topic"; + public static final String KEY = "key"; public static final String TIMESTAMP = "timestamp"; public static final String BATCH_SIZE = "batchSize"; public static final String BATCH_DURATION_MS = "batchDurationMillis";
