Updated Branches: refs/heads/flume-1.4 e488503c1 -> c954dfc76
FLUME-2081. JMX metrics support for SpoolDir. (Sravya Tirukkovalur via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c954dfc7 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c954dfc7 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c954dfc7 Branch: refs/heads/flume-1.4 Commit: c954dfc769850cf4a9675be303b98d33efb8ac45 Parents: e488503 Author: Mike Percy <[email protected]> Authored: Fri Jun 21 12:12:40 2013 -0700 Committer: Mike Percy <[email protected]> Committed: Fri Jun 21 12:13:52 2013 -0700 ---------------------------------------------------------------------- .../flume/source/SpoolDirectorySource.java | 25 +++++++++++++------- 1 file changed, 17 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/c954dfc7/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java index 641b5c6..7145580 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import org.apache.flume.*; import org.apache.flume.client.avro.ReliableSpoolingFileEventReader; import org.apache.flume.conf.Configurable; +import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.serialization.LineDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +59,7 @@ Configurable, EventDrivenSource { private String deletePolicy; private String inputCharset; - private CounterGroup counterGroup; + private SourceCounter sourceCounter; ReliableSpoolingFileEventReader reader; @Override @@ -68,7 +69,6 @@ Configurable, EventDrivenSource { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - counterGroup = new CounterGroup(); File directory = new File(spoolDirectory); try { @@ -89,17 +89,21 @@ Configurable, EventDrivenSource { ioe); } - Runnable runner = new SpoolDirectoryRunnable(reader, counterGroup); + Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter); executor.scheduleWithFixedDelay( runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS); super.start(); logger.debug("SpoolDirectorySource source started"); + sourceCounter.start(); } @Override public void stop() { super.stop(); + sourceCounter.stop(); + logger.info("SpoolDir source {} stopped. Metrics: {}", getName(), + sourceCounter); } @Override @@ -134,17 +138,19 @@ Configurable, EventDrivenSource { deserializerContext.put(LineDeserializer.MAXLINE_KEY, bufferMaxLineLength.toString()); } - + if (sourceCounter == null) { + sourceCounter = new SourceCounter(getName()); + } } private class SpoolDirectoryRunnable implements Runnable { private ReliableSpoolingFileEventReader reader; - private CounterGroup counterGroup; + private SourceCounter sourceCounter; public SpoolDirectoryRunnable(ReliableSpoolingFileEventReader reader, - CounterGroup counterGroup) { + SourceCounter sourceCounter) { this.reader = reader; - this.counterGroup = counterGroup; + this.sourceCounter = sourceCounter; } @Override @@ -155,10 +161,13 @@ Configurable, EventDrivenSource { if (events.isEmpty()) { break; } - counterGroup.addAndGet("spooler.events.read", (long) events.size()); + sourceCounter.addToEventReceivedCount(events.size()); + sourceCounter.incrementAppendBatchReceivedCount(); getChannelProcessor().processEventBatch(events); reader.commit(); + sourceCounter.addToEventAcceptedCount(events.size()); + sourceCounter.incrementAppendBatchAcceptedCount(); } } catch (Throwable t) { logger.error("Uncaught exception in Runnable", t);
