Updated Branches: refs/heads/flume-1.3.0 abf5f2392 -> 8222c4f9b
FLUME-1537. Dump RollingFileSink's counter status when agent stops (Ted Malaska via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/8222c4f9 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/8222c4f9 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/8222c4f9 Branch: refs/heads/flume-1.3.0 Commit: 8222c4f9b7ba60852882c5db3cec32c9db6e988f Parents: abf5f23 Author: Hari Shreedharan <[email protected]> Authored: Thu Sep 13 14:18:05 2012 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Thu Sep 13 14:19:14 2012 -0700 ---------------------------------------------------------------------- .../org/apache/flume/sink/RollingFileSink.java | 31 +++++++++++---- 1 files changed, 23 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/8222c4f9/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java index a94eea1..be640bb 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java @@ -28,12 +28,12 @@ import java.util.concurrent.TimeUnit; import org.apache.flume.Channel; import org.apache.flume.Context; -import org.apache.flume.CounterGroup; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.formatter.output.PathManager; +import org.apache.flume.instrumentation.SinkCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,26 +56,22 @@ public class RollingFileSink extends AbstractSink implements Configurable { private OutputStream outputStream; private ScheduledExecutorService rollService; - private Context context; - private String serializerType; private Context serializerContext; private EventSerializer serializer; - private CounterGroup counterGroup; + private SinkCounter sinkCounter; private PathManager pathController; private volatile boolean shouldRotate; public RollingFileSink() { - counterGroup = new CounterGroup(); pathController = new PathManager(); shouldRotate = false; } @Override public void configure(Context context) { - this.context = context; String directory = context.getString("sink.directory"); String rollInterval = context.getString("sink.rollInterval"); @@ -96,11 +92,16 @@ public class RollingFileSink extends AbstractSink implements Configurable { batchSize = context.getInteger("sink.batchSize", defaultBatchSize); this.directory = new File(directory); + + if (sinkCounter == null) { + sinkCounter = new SinkCounter(getName()); + } } @Override public void start() { - + logger.info("Starting {}...", this); + sinkCounter.start(); super.start(); pathController.setBaseDirectory(directory); @@ -131,6 +132,7 @@ public class RollingFileSink extends AbstractSink implements Configurable { } else{ logger.info("RollInterval is not valid, file rolling will not happen."); } + logger.info("RollingFileSink {} started.", getName()); } @Override @@ -146,8 +148,10 @@ public class RollingFileSink extends AbstractSink implements Configurable { serializer.beforeClose(); outputStream.flush(); outputStream.close(); + sinkCounter.incrementConnectionClosedCount(); shouldRotate = false; } catch (IOException e) { + sinkCounter.incrementConnectionFailedCount(); throw new EventDeliveryException("Unable to rotate file " + pathController.getCurrentFile() + " while delivering event", e); } @@ -167,7 +171,9 @@ public class RollingFileSink extends AbstractSink implements Configurable { serializer = EventSerializerFactory.getInstance( serializerType, serializerContext, outputStream); serializer.afterCreate(); + sinkCounter.incrementConnectionCreatedCount(); } catch (IOException e) { + sinkCounter.incrementConnectionFailedCount(); throw new EventDeliveryException("Failed to open file " + pathController.getCurrentFile() + " while delivering event", e); } @@ -180,9 +186,12 @@ public class RollingFileSink extends AbstractSink implements Configurable { try { transaction.begin(); + int eventAttemptCounter = 0; for (int i = 0; i < batchSize; i++) { event = channel.take(); if (event != null) { + sinkCounter.incrementEventDrainAttemptCount(); + eventAttemptCounter++; serializer.write(event); /* @@ -203,6 +212,7 @@ public class RollingFileSink extends AbstractSink implements Configurable { serializer.flush(); outputStream.flush(); transaction.commit(); + sinkCounter.addToEventDrainSuccessCount(eventAttemptCounter); } catch (Exception ex) { transaction.rollback(); throw new EventDeliveryException("Failed to process transaction", ex); @@ -215,7 +225,8 @@ public class RollingFileSink extends AbstractSink implements Configurable { @Override public void stop() { - + logger.info("RollingFile sink {} stopping...", getName()); + sinkCounter.stop(); super.stop(); if (outputStream != null) { @@ -226,7 +237,9 @@ public class RollingFileSink extends AbstractSink implements Configurable { serializer.beforeClose(); outputStream.flush(); outputStream.close(); + sinkCounter.incrementConnectionClosedCount(); } catch (IOException e) { + sinkCounter.incrementConnectionFailedCount(); logger.error("Unable to close output stream. Exception follows.", e); } } @@ -244,6 +257,8 @@ public class RollingFileSink extends AbstractSink implements Configurable { } } } + logger.info("RollingFile sink {} stopped. Event metrics: {}", + getName(), sinkCounter); } public File getDirectory() {
