Updated Branches:
  refs/heads/trunk 0e0e6346d -> f2960cd49

FLUME-1537. Dump RollingFileSink's counter status when agent stops

(Ted Malaska via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f2960cd4
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f2960cd4
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f2960cd4

Branch: refs/heads/trunk
Commit: f2960cd491c1ac499720fe468b1bb4f3330a726f
Parents: 0e0e634
Author: Hari Shreedharan <[email protected]>
Authored: Thu Sep 13 14:18:05 2012 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Thu Sep 13 14:18:05 2012 -0700

----------------------------------------------------------------------
 .../org/apache/flume/sink/RollingFileSink.java     |   31 +++++++++++----
 1 files changed, 23 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/f2960cd4/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java 
b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
index a94eea1..be640bb 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
@@ -28,12 +28,12 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
-import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.formatter.output.PathManager;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,26 +56,22 @@ public class RollingFileSink extends AbstractSink 
implements Configurable {
   private OutputStream outputStream;
   private ScheduledExecutorService rollService;
 
-  private Context context;
-
   private String serializerType;
   private Context serializerContext;
   private EventSerializer serializer;
 
-  private CounterGroup counterGroup;
+  private SinkCounter sinkCounter;
 
   private PathManager pathController;
   private volatile boolean shouldRotate;
 
   public RollingFileSink() {
-    counterGroup = new CounterGroup();
     pathController = new PathManager();
     shouldRotate = false;
   }
 
   @Override
   public void configure(Context context) {
-    this.context = context;
 
     String directory = context.getString("sink.directory");
     String rollInterval = context.getString("sink.rollInterval");
@@ -96,11 +92,16 @@ public class RollingFileSink extends AbstractSink 
implements Configurable {
     batchSize = context.getInteger("sink.batchSize", defaultBatchSize);
 
     this.directory = new File(directory);
+
+    if (sinkCounter == null) {
+      sinkCounter = new SinkCounter(getName());
+    }
   }
 
   @Override
   public void start() {
-
+    logger.info("Starting {}...", this);
+    sinkCounter.start();
     super.start();
 
     pathController.setBaseDirectory(directory);
@@ -131,6 +132,7 @@ public class RollingFileSink extends AbstractSink 
implements Configurable {
     } else{
       logger.info("RollInterval is not valid, file rolling will not happen.");
     }
+    logger.info("RollingFileSink {} started.", getName());
   }
 
   @Override
@@ -146,8 +148,10 @@ public class RollingFileSink extends AbstractSink 
implements Configurable {
           serializer.beforeClose();
           outputStream.flush();
           outputStream.close();
+          sinkCounter.incrementConnectionClosedCount();
           shouldRotate = false;
         } catch (IOException e) {
+          sinkCounter.incrementConnectionFailedCount();
           throw new EventDeliveryException("Unable to rotate file "
               + pathController.getCurrentFile() + " while delivering event", 
e);
         }
@@ -167,7 +171,9 @@ public class RollingFileSink extends AbstractSink 
implements Configurable {
         serializer = EventSerializerFactory.getInstance(
             serializerType, serializerContext, outputStream);
         serializer.afterCreate();
+        sinkCounter.incrementConnectionCreatedCount();
       } catch (IOException e) {
+        sinkCounter.incrementConnectionFailedCount();
         throw new EventDeliveryException("Failed to open file "
             + pathController.getCurrentFile() + " while delivering event", e);
       }
@@ -180,9 +186,12 @@ public class RollingFileSink extends AbstractSink 
implements Configurable {
 
     try {
       transaction.begin();
+      int eventAttemptCounter = 0;
       for (int i = 0; i < batchSize; i++) {
         event = channel.take();
         if (event != null) {
+          sinkCounter.incrementEventDrainAttemptCount();
+          eventAttemptCounter++;
           serializer.write(event);
 
           /*
@@ -203,6 +212,7 @@ public class RollingFileSink extends AbstractSink 
implements Configurable {
       serializer.flush();
       outputStream.flush();
       transaction.commit();
+      sinkCounter.addToEventDrainSuccessCount(eventAttemptCounter);
     } catch (Exception ex) {
       transaction.rollback();
       throw new EventDeliveryException("Failed to process transaction", ex);
@@ -215,7 +225,8 @@ public class RollingFileSink extends AbstractSink 
implements Configurable {
 
   @Override
   public void stop() {
-
+    logger.info("RollingFile sink {} stopping...", getName());
+    sinkCounter.stop();
     super.stop();
 
     if (outputStream != null) {
@@ -226,7 +237,9 @@ public class RollingFileSink extends AbstractSink 
implements Configurable {
         serializer.beforeClose();
         outputStream.flush();
         outputStream.close();
+        sinkCounter.incrementConnectionClosedCount();
       } catch (IOException e) {
+        sinkCounter.incrementConnectionFailedCount();
         logger.error("Unable to close output stream. Exception follows.", e);
       }
     }
@@ -244,6 +257,8 @@ public class RollingFileSink extends AbstractSink 
implements Configurable {
         }
       }
     }
+    logger.info("RollingFile sink {} stopped. Event metrics: {}",
+        getName(), sinkCounter);
   }
 
   public File getDirectory() {

Reply via email to