HBASE-15740 Replication source.shippedKBs metric is undercounting because it is 
in KB


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4c214b50
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4c214b50
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4c214b50

Branch: refs/heads/branch-1.3
Commit: 4c214b50c6efae02c0eb054b5ddf514469776a9f
Parents: 4ef5b4e
Author: Enis Soztutar <e...@apache.org>
Authored: Mon May 9 10:25:49 2016 -0700
Committer: Enis Soztutar <e...@apache.org>
Committed: Mon May 9 10:27:23 2016 -0700

----------------------------------------------------------------------
 .../MetricsReplicationSourceSource.java         |  5 +++-
 .../MetricsReplicationGlobalSourceSource.java   | 25 ++++++++++++++++++--
 .../MetricsReplicationSourceSourceImpl.java     | 13 ++++++++--
 .../replication/regionserver/MetricsSource.java | 10 ++++----
 .../regionserver/ReplicationSource.java         |  2 +-
 .../DummyRegionServerEndpointProtos.java        | 21 ++++++++--------
 6 files changed, 54 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4c214b50/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 3aa01ab..271f0ac 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -24,7 +24,10 @@ public interface MetricsReplicationSourceSource {
   public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = 
"source.ageOfLastShippedOp";
   public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
 
+  @Deprecated
+  /** @deprecated Use SOURCE_SHIPPED_BYTES instead */
   public static final String SOURCE_SHIPPED_KBS = "source.shippedKBs";
+  public static final String SOURCE_SHIPPED_BYTES = "source.shippedBytes";
   public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";
 
   public static final String SOURCE_LOG_READ_IN_BYTES = 
"source.logReadInBytes";
@@ -41,7 +44,7 @@ public interface MetricsReplicationSourceSource {
   void incrLogEditsFiltered(long size);
   void incrBatchesShipped(int batches);
   void incrOpsShipped(long ops);
-  void incrShippedKBs(long size);
+  void incrShippedBytes(long size);
   void incrLogReadInBytes(long size);
   void incrLogReadInEdits(long size);
   void clear();

http://git-wip-us.apache.org/repos/asf/hbase/blob/4c214b50/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 2526f32..476d2f7 100644
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -30,6 +30,8 @@ public class MetricsReplicationGlobalSourceSource implements 
MetricsReplicationS
   private final MutableFastCounter logEditsFilteredCounter;
   private final MutableFastCounter shippedBatchesCounter;
   private final MutableFastCounter shippedOpsCounter;
+  private final MutableFastCounter shippedBytesCounter;
+  @Deprecated
   private final MutableFastCounter shippedKBsCounter;
   private final MutableFastCounter logReadInBytesCounter;
   private final MutableFastCounter shippedHFilesCounter;
@@ -48,6 +50,8 @@ public class MetricsReplicationGlobalSourceSource implements 
MetricsReplicationS
 
     shippedKBsCounter = 
rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L);
 
+    shippedBytesCounter = 
rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L);
+
     logReadInBytesCounter = 
rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L);
 
     logReadInEditsCounter = 
rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
@@ -88,8 +92,25 @@ public class MetricsReplicationGlobalSourceSource implements 
MetricsReplicationS
     shippedOpsCounter.incr(ops);
   }
 
-  @Override public void incrShippedKBs(long size) {
-    shippedKBsCounter.incr(size);
+  @Override public void incrShippedBytes(long size) {
+    shippedBytesCounter.incr(size);
+    // obtained value maybe smaller than 1024. We should make sure that KB 
count
+    // eventually picks up even from multiple smaller updates.
+    incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
+  }
+
+  static void incrementKBsCounter(MutableFastCounter bytesCounter, 
MutableFastCounter kbsCounter) {
+    // Following code should be thread-safe.
+    long delta = 0;
+    while(true) {
+      long bytes = bytesCounter.value();
+      delta = (bytes / 1024) - kbsCounter.value();
+      if (delta > 0) {
+        kbsCounter.incr(delta);
+      } else {
+        break;
+      }
+    }
   }
 
   @Override public void incrLogReadInBytes(long size) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4c214b50/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 03e3116..835e81c 100644
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -30,7 +30,9 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
   private final String logEditsFilteredKey;
   private final String shippedBatchesKey;
   private final String shippedOpsKey;
+  @Deprecated
   private final String shippedKBsKey;
+  private final String shippedBytesKey;
   private final String logReadInBytesKey;
   private final String shippedHFilesKey;
   private final String sizeOfHFileRefsQueueKey;
@@ -42,6 +44,7 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
   private final MutableFastCounter shippedBatchesCounter;
   private final MutableFastCounter shippedOpsCounter;
   private final MutableFastCounter shippedKBsCounter;
+  private final MutableFastCounter shippedBytesCounter;
   private final MutableFastCounter logReadInBytesCounter;
   private final MutableFastCounter shippedHFilesCounter;
   private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
@@ -65,6 +68,9 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
     shippedKBsKey = "source." + this.id + ".shippedKBs";
     shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L);
 
+    shippedBytesKey = "source." + this.id + ".shippedBytes";
+    shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 
0L);
+
     logReadInBytesKey = "source." + this.id + ".logReadInBytes";
     logReadInBytesCounter = 
rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L);
 
@@ -109,8 +115,10 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
     shippedOpsCounter.incr(ops);
   }
 
-  @Override public void incrShippedKBs(long size) {
-    shippedKBsCounter.incr(size);
+  @Override public void incrShippedBytes(long size) {
+    shippedBytesCounter.incr(size);
+    MetricsReplicationGlobalSourceSource
+      .incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
   }
 
   @Override public void incrLogReadInBytes(long size) {
@@ -125,6 +133,7 @@ public class MetricsReplicationSourceSourceImpl implements 
MetricsReplicationSou
     rms.removeMetric(shippedBatchesKey);
     rms.removeMetric(shippedOpsKey);
     rms.removeMetric(shippedKBsKey);
+    rms.removeMetric(shippedBytesKey);
 
     rms.removeMetric(logReadInBytesKey);
     rms.removeMetric(logReadInEditsKey);

http://git-wip-us.apache.org/repos/asf/hbase/blob/4c214b50/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 4a044bf..b07f1d1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -136,15 +136,15 @@ public class MetricsSource {
    *
    * @param batchSize the size of the batch that was shipped to sinks.
    */
-  public void shipBatch(long batchSize, int sizeInKB) {
+  public void shipBatch(long batchSize, int sizeInBytes) {
     singleSourceSource.incrBatchesShipped(1);
     globalSourceSource.incrBatchesShipped(1);
 
     singleSourceSource.incrOpsShipped(batchSize);
     globalSourceSource.incrOpsShipped(batchSize);
 
-    singleSourceSource.incrShippedKBs(sizeInKB);
-    globalSourceSource.incrShippedKBs(sizeInKB);
+    singleSourceSource.incrShippedBytes(sizeInBytes);
+    globalSourceSource.incrShippedBytes(sizeInBytes);
   }
 
   /**
@@ -153,8 +153,8 @@ public class MetricsSource {
    * @param batchSize the size of the batch that was shipped to sinks.
    * @param hfiles total number of hfiles shipped to sinks.
    */
-  public void shipBatch(long batchSize, int sizeInKB, long hfiles) {
-    shipBatch(batchSize, sizeInKB);
+  public void shipBatch(long batchSize, int sizeInBytes, long hfiles) {
+    shipBatch(batchSize, sizeInBytes);
     singleSourceSource.incrHFilesShipped(hfiles);
     globalSourceSource.incrHFilesShipped(hfiles);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4c214b50/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index dc6cf36..bb3af63 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -1059,7 +1059,7 @@ public class ReplicationSource extends Thread
           totalReplicatedEdits.addAndGet(entries.size());
           totalReplicatedOperations.addAndGet(currentNbOperations);
           // FIXME check relationship between wal group and overall
-          metrics.shipBatch(currentNbOperations, currentSize / 1024, 
currentNbHFiles);
+          metrics.shipBatch(currentNbOperations, currentSize, currentNbHFiles);
           metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 
1).getKey().getWriteTime(),
             walGroupId);
           if (LOG.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4c214b50/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/protobuf/generated/DummyRegionServerEndpointProtos.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/protobuf/generated/DummyRegionServerEndpointProtos.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/protobuf/generated/DummyRegionServerEndpointProtos.java
index 2ad3c59..a011b30 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/protobuf/generated/DummyRegionServerEndpointProtos.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/protobuf/generated/DummyRegionServerEndpointProtos.java
@@ -1,5 +1,5 @@
 // Generated by the protocol buffer compiler.  DO NOT EDIT!
-// source: hbase-server/src/test/protobuf/DummyRegionServerEndpoint.proto
+// source: DummyRegionServerEndpoint.proto
 
 package org.apache.hadoop.hbase.coprocessor.protobuf.generated;
 
@@ -1185,16 +1185,15 @@ public final class DummyRegionServerEndpointProtos {
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n>hbase-server/src/test/protobuf/DummyRe" +
-      "gionServerEndpoint.proto\022\rhbase.test.pb\"" +
-      "\016\n\014DummyRequest\"\036\n\rDummyResponse\022\r\n\005valu" +
-      "e\030\001 
\002(\t2\237\001\n\014DummyService\022F\n\tdummyCall\022\033." +
-      "hbase.test.pb.DummyRequest\032\034.hbase.test." +
-      "pb.DummyResponse\022G\n\ndummyThrow\022\033.hbase.t" +
-      "est.pb.DummyRequest\032\034.hbase.test.pb.Dumm" +
-      "yResponseB_\n6org.apache.hadoop.hbase.cop" +
-      "rocessor.protobuf.generatedB\037DummyRegion" +
-      "ServerEndpointProtos\210\001\001\240\001\001"
+      "\n\037DummyRegionServerEndpoint.proto\022\rhbase" +
+      ".test.pb\"\016\n\014DummyRequest\"\036\n\rDummyRespons" +
+      "e\022\r\n\005value\030\001 
\002(\t2\237\001\n\014DummyService\022F\n\tdum" +
+      "myCall\022\033.hbase.test.pb.DummyRequest\032\034.hb" +
+      "ase.test.pb.DummyResponse\022G\n\ndummyThrow\022" +
+      "\033.hbase.test.pb.DummyRequest\032\034.hbase.tes" +
+      "t.pb.DummyResponseB_\n6org.apache.hadoop." +
+      "hbase.coprocessor.protobuf.generatedB\037Du" +
+      "mmyRegionServerEndpointProtos\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

Reply via email to