Repository: flume Updated Branches: refs/heads/trunk 84465664c -> 199684b62
FLUME-2578. Kafka source throws NPE if Kafka record has null key (Gwen Shapira via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/199684b6 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/199684b6 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/199684b6 Branch: refs/heads/trunk Commit: 199684b62ec983b8f922b1d6d706479032a18e64 Parents: 8446566 Author: Hari Shreedharan <[email protected]> Authored: Tue Dec 30 11:42:47 2014 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Tue Dec 30 11:42:47 2014 -0800 ---------------------------------------------------------------------- .../apache/flume/source/kafka/KafkaSource.java | 4 +++- .../flume/source/kafka/TestKafkaSource.java | 25 ++++++++++++++++++-- 2 files changed, 26 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/199684b6/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index 7bc03da..00a81c6 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -102,7 +102,9 @@ public class KafkaSource extends AbstractSource headers.put(KafkaSourceConstants.TIMESTAMP, String.valueOf(System.currentTimeMillis())); headers.put(KafkaSourceConstants.TOPIC, topic); - headers.put(KafkaSourceConstants.KEY, new String(kafkaKey)); + if (kafkaKey != null) { + headers.put(KafkaSourceConstants.KEY, new String(kafkaKey)); + } if (log.isDebugEnabled()) { log.debug("Message: {}", new String(kafkaMessage)); } http://git-wip-us.apache.org/repos/asf/flume/blob/199684b6/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java index 72eec77..8ec14cc 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java @@ -105,8 +105,6 @@ public class TestKafkaSource { Assert.assertEquals("hello, world", new String(events.get(0).getBody(), Charsets.UTF_8)); - - } @SuppressWarnings("unchecked") @@ -301,6 +299,29 @@ public class TestKafkaSource { } + @SuppressWarnings("unchecked") + @Test + public void testNullKey() throws EventDeliveryException, + SecurityException, NoSuchFieldException, IllegalArgumentException, + IllegalAccessException, InterruptedException { + context.put(KafkaSourceConstants.BATCH_SIZE,"1"); + kafkaSource.configure(context); + kafkaSource.start(); + + Thread.sleep(500L); + + kafkaServer.produce(topicName, null , "hello, world"); + + Thread.sleep(500L); + + Assert.assertEquals(Status.READY, kafkaSource.process()); + Assert.assertEquals(Status.BACKOFF, kafkaSource.process()); + Assert.assertEquals(1, events.size()); + + Assert.assertEquals("hello, world", new String(events.get(0).getBody(), + Charsets.UTF_8)); + } + ChannelProcessor createGoodChannel() { ChannelProcessor channelProcessor = mock(ChannelProcessor.class);
