Repository: flume Updated Branches: refs/heads/flume-1.6 159178aa6 -> 70ba4a97f
FLUME-2578. Kafka source throws NPE if Kafka record has null key (Gwen Shapira via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/70ba4a97 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/70ba4a97 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/70ba4a97 Branch: refs/heads/flume-1.6 Commit: 70ba4a97f11d8afd0f1d3f1eb31f7958ca808f29 Parents: 159178a Author: Hari Shreedharan <[email protected]> Authored: Tue Dec 30 11:42:47 2014 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Tue Dec 30 11:43:31 2014 -0800 ---------------------------------------------------------------------- .../apache/flume/source/kafka/KafkaSource.java | 4 +++- .../flume/source/kafka/TestKafkaSource.java | 25 ++++++++++++++++++-- 2 files changed, 26 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/70ba4a97/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index 7bc03da..00a81c6 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -102,7 +102,9 @@ public class KafkaSource extends AbstractSource headers.put(KafkaSourceConstants.TIMESTAMP, String.valueOf(System.currentTimeMillis())); headers.put(KafkaSourceConstants.TOPIC, topic); - headers.put(KafkaSourceConstants.KEY, new String(kafkaKey)); + if (kafkaKey != null) { + headers.put(KafkaSourceConstants.KEY, new String(kafkaKey)); + } if (log.isDebugEnabled()) { log.debug("Message: {}", new String(kafkaMessage)); } http://git-wip-us.apache.org/repos/asf/flume/blob/70ba4a97/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java index 72eec77..8ec14cc 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java @@ -105,8 +105,6 @@ public class TestKafkaSource { Assert.assertEquals("hello, world", new String(events.get(0).getBody(), Charsets.UTF_8)); - - } @SuppressWarnings("unchecked") @@ -301,6 +299,29 @@ public class TestKafkaSource { } + @SuppressWarnings("unchecked") + @Test + public void testNullKey() throws EventDeliveryException, + SecurityException, NoSuchFieldException, IllegalArgumentException, + IllegalAccessException, InterruptedException { + context.put(KafkaSourceConstants.BATCH_SIZE,"1"); + kafkaSource.configure(context); + kafkaSource.start(); + + Thread.sleep(500L); + + kafkaServer.produce(topicName, null , "hello, world"); + + Thread.sleep(500L); + + Assert.assertEquals(Status.READY, kafkaSource.process()); + Assert.assertEquals(Status.BACKOFF, kafkaSource.process()); + Assert.assertEquals(1, events.size()); + + Assert.assertEquals("hello, world", new String(events.get(0).getBody(), + Charsets.UTF_8)); + } + ChannelProcessor createGoodChannel() { ChannelProcessor channelProcessor = mock(ChannelProcessor.class);
