Repository: flume Updated Branches: refs/heads/trunk 22cd3909b -> ac999bebe
FLUME-2246. Make event data size configurable for logger sinker (Ashish Paliwal via Roshan Naik) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ac999beb Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ac999beb Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ac999beb Branch: refs/heads/trunk Commit: ac999bebeaa59659960fe9e998a0c20b1d476564 Parents: 22cd390 Author: Roshan Naik <[email protected]> Authored: Wed Nov 19 20:28:23 2014 -0800 Committer: Roshan Naik <[email protected]> Committed: Wed Nov 19 20:28:23 2014 -0800 ---------------------------------------------------------------------- .../java/org/apache/flume/sink/LoggerSink.java | 32 +++++++++++++++++--- .../org/apache/flume/sink/TestLoggerSink.java | 23 ++++++++++++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 1 + 3 files changed, 52 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ac999beb/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java index 128fa84..9cf9bc2 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java @@ -18,18 +18,20 @@ package org.apache.flume.sink; +import com.google.common.base.Strings; import org.apache.flume.Channel; +import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; -import org.apache.flume.Sink; import org.apache.flume.Transaction; +import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * <p> - * A {@link Sink} implementation that logs all events received at the INFO level + * A {@link org.apache.flume.Sink} implementation that logs all events received at the INFO level * to the <tt>org.apache.flume.sink.LoggerSink</tt> logger. * </p> * <p> @@ -49,11 +51,33 @@ import org.slf4j.LoggerFactory; * TODO * </p> */ -public class LoggerSink extends AbstractSink { +public class LoggerSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory .getLogger(LoggerSink.class); + // Default Max bytes to dump + public static final int DEFAULT_MAX_BYTE_DUMP = 16; + + // Max number of bytes to be dumped + private int maxBytesToLog = DEFAULT_MAX_BYTE_DUMP; + + public static final String MAX_BYTES_DUMP_KEY = "maxBytesToLog"; + + @Override + public void configure(Context context) { + String strMaxBytes = context.getString(MAX_BYTES_DUMP_KEY); + if (!Strings.isNullOrEmpty(strMaxBytes)) { + try { + maxBytesToLog = Integer.parseInt(strMaxBytes); + } catch (NumberFormatException e) { + logger.warn(String.format("Unable to convert %s to integer, using default value(%d) for maxByteToDump", + strMaxBytes, DEFAULT_MAX_BYTE_DUMP)); + maxBytesToLog = DEFAULT_MAX_BYTE_DUMP; + } + } + } + @Override public Status process() throws EventDeliveryException { Status result = Status.READY; @@ -67,7 +91,7 @@ public class LoggerSink extends AbstractSink { if (event != null) { if (logger.isInfoEnabled()) { - logger.info("Event: " + EventHelper.dumpEvent(event)); + logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog)); } } else { // No event found, request back-off semantics from the sink runner http://git-wip-us.apache.org/repos/asf/flume/blob/ac999beb/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java index 92ff6fe..3257ced 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java @@ -19,6 +19,7 @@ package org.apache.flume.sink; +import com.google.common.base.Strings; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; @@ -56,6 +57,28 @@ public class TestLoggerSink { for (int i = 0; i < 10; i++) { Event event = EventBuilder.withBody(("Test " + i).getBytes()); + channel.put(event); + sink.process(); + } + + sink.stop(); + } + + @Test + public void testAppendWithCustomSize() throws InterruptedException, LifecycleException, + EventDeliveryException { + + Channel channel = new PseudoTxnMemoryChannel(); + Context context = new Context(); + context.put(LoggerSink.MAX_BYTES_DUMP_KEY, String.valueOf(30)); + Configurables.configure(channel, context); + Configurables.configure(sink, context); + + sink.setChannel(channel); + sink.start(); + + for (int i = 0; i < 10; i++) { + Event event = EventBuilder.withBody((Strings.padStart("Test " + i, 30, 'P')).getBytes()); channel.put(event); sink.process(); http://git-wip-us.apache.org/repos/asf/flume/blob/ac999beb/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 0199d62..bcadc2d 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1732,6 +1732,7 @@ Property Name Default Description ============== ======= =========================================== **channel** -- **type** -- The component type name, needs to be ``logger`` +maxBytesToLog 16 Maximum number of bytes of the Event body to log ============== ======= =========================================== Example for agent named a1:
