Updated Branches: refs/heads/flume-1.4 ddd0a9365 -> 9dade66ae
FLUME-1689: BodyTextSerializer should allow an option not to add a newline to each serialized event (Mike Percy via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/9dade66a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/9dade66a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/9dade66a Branch: refs/heads/flume-1.4 Commit: 9dade66ae867a0df1b9c1327c62a9e26e41a625c Parents: ddd0a93 Author: Brock Noland <[email protected]> Authored: Fri Nov 9 13:53:05 2012 -0600 Committer: Brock Noland <[email protected]> Committed: Fri Nov 9 13:55:08 2012 -0600 ---------------------------------------------------------------------- .../serialization/BodyTextEventSerializer.java | 23 +++--- .../serialization/TestBodyTextEventSerializer.java | 42 ++++++++-- flume-ng-doc/sphinx/FlumeUserGuide.rst | 58 +++++++++++++++ 3 files changed, 103 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/9dade66a/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java index dc291cd..d09f02d 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.OutputStream; import org.apache.flume.Context; import org.apache.flume.Event; -import org.apache.flume.conf.Configurable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,23 +29,24 @@ import org.slf4j.LoggerFactory; * This class simply writes the body of the event to the output stream * and appends a newline after each event. */ -public class BodyTextEventSerializer implements EventSerializer, Configurable { +public class BodyTextEventSerializer implements EventSerializer { private final static Logger logger = LoggerFactory.getLogger(BodyTextEventSerializer.class); + // for legacy reasons, by default, append a newline to each event written out + private final String APPEND_NEWLINE = "appendNewline"; + private final boolean APPEND_NEWLINE_DFLT = true; + private final OutputStream out; + private final boolean appendNewline; - private BodyTextEventSerializer(OutputStream out) { + private BodyTextEventSerializer(OutputStream out, Context ctx) { + this.appendNewline = ctx.getBoolean(APPEND_NEWLINE, APPEND_NEWLINE_DFLT); this.out = out; } @Override - public void configure(Context context) { - // noop - } - - @Override public boolean supportsReopen() { return true; } @@ -69,7 +69,9 @@ public class BodyTextEventSerializer implements EventSerializer, Configurable { @Override public void write(Event e) throws IOException { out.write(e.getBody()); - out.write('\n'); + if (appendNewline) { + out.write('\n'); + } } @Override @@ -81,8 +83,7 @@ public class BodyTextEventSerializer implements EventSerializer, Configurable { @Override public EventSerializer build(Context context, OutputStream out) { - BodyTextEventSerializer s = new BodyTextEventSerializer(out); - s.configure(context); + BodyTextEventSerializer s = new BodyTextEventSerializer(out, context); return s; } http://git-wip-us.apache.org/repos/asf/flume/blob/9dade66a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestBodyTextEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestBodyTextEventSerializer.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestBodyTextEventSerializer.java index e3cb255..b1a6c13 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestBodyTextEventSerializer.java +++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestBodyTextEventSerializer.java @@ -27,6 +27,7 @@ import java.io.FileReader; import java.io.IOException; import java.io.OutputStream; import org.apache.commons.io.FileUtils; +import org.apache.flume.Context; import org.apache.flume.event.EventBuilder; import org.junit.Assert; import org.junit.Test; @@ -34,13 +35,14 @@ import org.junit.Test; public class TestBodyTextEventSerializer { File testFile = new File("src/test/resources/events.txt"); + File expectedFile = new File("src/test/resources/events.txt"); @Test - public void test() throws FileNotFoundException, IOException { + public void testWithNewline() throws FileNotFoundException, IOException { OutputStream out = new FileOutputStream(testFile); EventSerializer serializer = - EventSerializerFactory.getInstance("text", null, out); + EventSerializerFactory.getInstance("text", new Context(), out); serializer.afterCreate(); serializer.write(EventBuilder.withBody("event 1", Charsets.UTF_8)); serializer.write(EventBuilder.withBody("event 2", Charsets.UTF_8)); @@ -51,14 +53,36 @@ public class TestBodyTextEventSerializer { out.close(); BufferedReader reader = new BufferedReader(new FileReader(testFile)); - String line; - int num = 0; - while ((line = reader.readLine()) != null) { - System.out.println(line); - num++; - } + Assert.assertEquals("event 1", reader.readLine()); + Assert.assertEquals("event 2", reader.readLine()); + Assert.assertEquals("event 3", reader.readLine()); + Assert.assertNull(reader.readLine()); - Assert.assertEquals(3, num); + FileUtils.forceDelete(testFile); + } + + @Test + public void testNoNewline() throws FileNotFoundException, IOException { + + OutputStream out = new FileOutputStream(testFile); + Context context = new Context(); + context.put("appendNewline", "false"); + EventSerializer serializer = + EventSerializerFactory.getInstance("text", context, out); + serializer.afterCreate(); + serializer.write(EventBuilder.withBody("event 1\n", Charsets.UTF_8)); + serializer.write(EventBuilder.withBody("event 2\n", Charsets.UTF_8)); + serializer.write(EventBuilder.withBody("event 3\n", Charsets.UTF_8)); + serializer.flush(); + serializer.beforeClose(); + out.flush(); + out.close(); + + BufferedReader reader = new BufferedReader(new FileReader(testFile)); + Assert.assertEquals("event 1", reader.readLine()); + Assert.assertEquals("event 2", reader.readLine()); + Assert.assertEquals("event 3", reader.readLine()); + Assert.assertNull(reader.readLine()); FileUtils.forceDelete(testFile); } http://git-wip-us.apache.org/repos/asf/flume/blob/9dade66a/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index ab2c923..a9590ec 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1866,6 +1866,64 @@ Custom Sink Processor Custom sink processors are not supported at the moment. +Event Serializers +----------------- + +The ``FILE_ROLL`` sink and the ``HDFS`` sink both support the +``EventSerializer`` interface. Details of the EventSerializers that ship with +Flume are provided below. + +Body Text Serializer +~~~~~~~~~~~~~~~~~~~~ + +Alias: ``TEXT``. This interceptor writes the body of the event to an output +stream without any transformation or modification. The event headers are +ignored. Configuration options are as follows: + +========================= ================ =========================================================================== +Property Name Default Description +========================= ================ =========================================================================== +appendNewline true Whether a newline will be appended to each event at write time. The default + of true assumes that events do not contain newlines, for legacy reasons. +========================= ================ =========================================================================== + +Example for agent named **agent_foo**: + +.. code-block:: properties + + agent_foo.sinks = fileSink-1 + agent_foo.sinks.fileSink-1.type = FILE_ROLL + agent_foo.sinks.fileSink-1.channel = memoryChannel-1 + agent_foo.sinks.fileSink-1.sink.directory = /var/log/flume + agent_foo.sinks.fileSink-1.sink.serializer = TEXT + agent_foo.sinks.fileSink-1.sink.serializer.appendNewline = false + +Avro Event Serializer +~~~~~~~~~~~~~~~~~~~~~ + +Alias: ``AVRO_EVENT``. This interceptor serializes Flume events into an Avro +container file. The schema used is the same schema used for Flume events +in the Avro RPC mechanism. This serializers inherits from the +``AbstractAvroEventSerializer`` class. Configuration options are as follows: + +========================== ================ =========================================================================== +Property Name Default Description +========================== ================ =========================================================================== +syncIntervalBytes 2048000 Avro sync interval, in approximate bytes. +compressionCodec null Avro compression codec. For supported codecs, see Avro's CodecFactory docs. +========================== ================ =========================================================================== + +Example for agent named **agent_foo**: + +.. code-block:: properties + + agent_foo.sinks.hdfsSink-1.type = hdfs + agent_foo.sinks.hdfsSink-1.channel = memoryChannel-1 + agent_foo.sinks.hdfsSink-1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S + agent_foo.sinks.hdfsSink-1.serializer = AVRO_EVENT + agent_foo.sinks.hdfsSink-1.serializer.compressionCodec = snappy + + Flume Interceptors ------------------
