Repository: flume Updated Branches: refs/heads/flume-1.7 7dbe6fef6 -> faad35801
FLUME-2781. Kafka Channel with parseAsFlumeEvent=true should write data as is, not as flume events. (Gonzalo Herreros via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/faad3580 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/faad3580 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/faad3580 Branch: refs/heads/flume-1.7 Commit: faad35801b24b9f0ca34d8b86f28dded468d73b8 Parents: 7dbe6fe Author: Hari Shreedharan <[email protected]> Authored: Thu Oct 8 18:28:31 2015 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Thu Oct 8 18:29:47 2015 -0700 ---------------------------------------------------------------------- .../flume/channel/kafka/KafkaChannel.java | 33 +++++++++-------- .../flume/channel/kafka/TestKafkaChannel.java | 37 ++++++++++++++++++++ 2 files changed, 56 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/faad3580/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index c83d4f6..c0c1c66 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -261,21 +261,26 @@ public class KafkaChannel extends BasicChannelSemantics { } try { - if (!tempOutStream.isPresent()) { - tempOutStream = Optional.of(new ByteArrayOutputStream()); - } - if (!writer.isPresent()) { - writer = Optional.of(new - SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class)); + if (parseAsFlumeEvent) { + if (!tempOutStream.isPresent()) { + tempOutStream = Optional.of(new ByteArrayOutputStream()); + } + if (!writer.isPresent()) { + writer = Optional.of(new + SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class)); + } + tempOutStream.get().reset(); + AvroFlumeEvent e = new AvroFlumeEvent( + toCharSeqMap(event.getHeaders()), + ByteBuffer.wrap(event.getBody())); + encoder = EncoderFactory.get() + .directBinaryEncoder(tempOutStream.get(), encoder); + writer.get().write(e, encoder); + // Not really possible to avoid this copy :( + serializedEvents.get().add(tempOutStream.get().toByteArray()); + } else { + serializedEvents.get().add(event.getBody()); } - tempOutStream.get().reset(); - AvroFlumeEvent e = new AvroFlumeEvent( - toCharSeqMap(event.getHeaders()), ByteBuffer.wrap(event.getBody())); - encoder = EncoderFactory.get() - .directBinaryEncoder(tempOutStream.get(), encoder); - writer.get().write(e, encoder); - // Not really possible to avoid this copy :( - serializedEvents.get().add(tempOutStream.get().toByteArray()); } catch (Exception e) { throw new ChannelException("Error while serializing event", e); } http://git-wip-us.apache.org/repos/asf/flume/blob/faad3580/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java index 25b9e40..319e779 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java @@ -168,6 +168,43 @@ public class TestKafkaChannel { } /** + * Like the previous test but here we write to the channel like a Flume source would do + * to verify that the events are written as text and not as an Avro object + * + * @throws Exception + */ + @Test + public void testWritingToNoParsingAsFlumeAgent() throws Exception { + final KafkaChannel channel = startChannel(false); + + List<String> msgs = new ArrayList<String>(); + for (int i = 0; i < 50; i++){ + msgs.add(String.valueOf(i)); + } + Transaction tx = channel.getTransaction(); + tx.begin(); + for (int i = 0; i < msgs.size(); i++){ + channel.put(EventBuilder.withBody(msgs.get(i).getBytes())); + } + tx.commit(); + ExecutorCompletionService<Void> submitterSvc = new + ExecutorCompletionService<Void>(Executors.newCachedThreadPool()); + List<Event> events = pullEvents(channel, submitterSvc, + 50, false, false); + wait(submitterSvc, 5); + Set<Integer> finals = Sets.newHashSet(); + for (int i = 0; i < 50; i++) { + finals.add(Integer.parseInt(new String(events.get(i).getBody()))); + } + for (int i = 0; i < 50; i++) { + Assert.assertTrue(finals.contains(i)); + finals.remove(i); + } + Assert.assertTrue(finals.isEmpty()); + channel.stop(); + } + + /** * This method starts a channel, puts events into it. The channel is then * stopped and restarted. Then we check to make sure if all events we put * come out. Optionally, 10 events are rolled back,
