Updated Branches: refs/heads/trunk e442c29a6 -> 791f443fa
FLUME-2072. JMX metrics support for HBase Sink (Sravya Tirukkovalur via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/791f443f Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/791f443f Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/791f443f Branch: refs/heads/trunk Commit: 791f443fae173054cf29ac52fee8e9cf7fe70dc7 Parents: e442c29 Author: Hari Shreedharan <[email protected]> Authored: Fri Jun 7 14:54:00 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri Jun 7 14:55:27 2013 -0700 ---------------------------------------------------------------------- .../org/apache/flume/sink/hbase/HBaseSink.java | 29 ++++++++++++--- 1 files changed, 23 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/791f443f/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java index a8ec87e..d5996c3 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java @@ -24,12 +24,12 @@ import java.util.List; import org.apache.flume.Channel; import org.apache.flume.Context; -import org.apache.flume.CounterGroup; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; +import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -88,7 +88,6 @@ public class HBaseSink extends AbstractSink implements Configurable { private HTable table; private long batchSize; private Configuration config; - private CounterGroup counterGroup = new CounterGroup(); private static final Logger logger = LoggerFactory.getLogger(HBaseSink.class); private HbaseEventSerializer serializer; private String eventSerializerType; @@ -97,6 +96,7 @@ public class HBaseSink extends AbstractSink implements Configurable { private String kerberosKeytab; private User hbaseUser; private boolean enableWal = true; + private SinkCounter sinkCounter; public HBaseSink(){ this(HBaseConfiguration.create()); @@ -116,6 +116,7 @@ public class HBaseSink extends AbstractSink implements Configurable { kerberosPrincipal, kerberosKeytab); } } catch (Exception ex) { + sinkCounter.incrementConnectionFailedCount(); throw new FlumeException("Failed to login to HBase using " + "provided credentials.", ex); } @@ -131,6 +132,7 @@ public class HBaseSink extends AbstractSink implements Configurable { } }); } catch (Exception e) { + sinkCounter.incrementConnectionFailedCount(); logger.error("Could not load table, " + tableName + " from HBase", e); throw new FlumeException("Could not load table, " + tableName + @@ -149,6 +151,7 @@ public class HBaseSink extends AbstractSink implements Configurable { } catch (Exception e) { //Get getTableDescriptor also throws IOException, so catch the IOException //thrown above or by the getTableDescriptor() call. + sinkCounter.incrementConnectionFailedCount(); throw new FlumeException("Error getting column family from HBase." + "Please verify that the table " + tableName + " and Column Family, " + Bytes.toString(columnFamily) + " exists in HBase, and the" @@ -156,6 +159,8 @@ public class HBaseSink extends AbstractSink implements Configurable { } super.start(); + sinkCounter.incrementConnectionCreatedCount(); + sinkCounter.start(); } @Override @@ -166,6 +171,8 @@ public class HBaseSink extends AbstractSink implements Configurable { } catch (IOException e) { throw new FlumeException("Error closing table.", e); } + sinkCounter.incrementConnectionClosedCount(); + sinkCounter.stop(); } @SuppressWarnings("unchecked") @@ -214,6 +221,7 @@ public class HBaseSink extends AbstractSink implements Configurable { "writes to HBase will have WAL disabled, and any data in the " + "memstore of this region in the Region Server could be lost!"); } + sinkCounter = new SinkCounter(this.getName()); } @Override @@ -224,11 +232,16 @@ public class HBaseSink extends AbstractSink implements Configurable { List<Row> actions = new LinkedList<Row>(); List<Increment> incs = new LinkedList<Increment>(); txn.begin(); - for(long i = 0; i < batchSize; i++) { + long i = 0; + for(; i < batchSize; i++) { Event event = channel.take(); if(event == null){ status = Status.BACKOFF; - counterGroup.incrementAndGet("channel.underflow"); + if (i == 0) { + sinkCounter.incrementBatchEmptyCount(); + } else { + sinkCounter.incrementBatchUnderflowCount(); + } break; } else { serializer.initialize(event, columnFamily); @@ -236,6 +249,11 @@ public class HBaseSink extends AbstractSink implements Configurable { incs.addAll(serializer.getIncrements()); } } + if (i == batchSize) { + sinkCounter.incrementBatchCompleteCount(); + } + sinkCounter.addToEventDrainAttemptCount(i); + putEventsAndCommit(actions, incs, txn); return status; } @@ -272,7 +290,7 @@ public class HBaseSink extends AbstractSink implements Configurable { }); txn.commit(); - counterGroup.incrementAndGet("transaction.success"); + sinkCounter.addToEventDrainSuccessCount(actions.size()); } catch (Throwable e) { try{ txn.rollback(); @@ -280,7 +298,6 @@ public class HBaseSink extends AbstractSink implements Configurable { logger.error("Exception in rollback. Rollback might not have been" + "successful." , e2); } - counterGroup.incrementAndGet("transaction.rollback"); logger.error("Failed to commit transaction." + "Transaction rolled back.", e); if(e instanceof Error || e instanceof RuntimeException){
