Repository: hbase
Updated Branches:
  refs/heads/branch-1 e029c554b -> 4b3df0f92


HBASE-17296 Provide per peer throttling for replication

Signed-off-by: Phil Yang <yangzhe1...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4b3df0f9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4b3df0f9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4b3df0f9

Branch: refs/heads/branch-1
Commit: 4b3df0f926f6ec64bd6e071b1ae18ff635b4cc98
Parents: e029c55
Author: Guanghao Zhang <zghao...@gmail.com>
Authored: Wed Dec 14 16:34:55 2016 +0800
Committer: Phil Yang <yangzhe1...@apache.org>
Committed: Thu Dec 15 16:13:04 2016 +0800

----------------------------------------------------------------------
 .../hbase/replication/ReplicationPeer.java      |   6 +
 .../replication/ReplicationPeerConfig.java      |  13 +-
 .../replication/ReplicationPeerZKImpl.java      |   6 +
 .../replication/ReplicationPeersZKImpl.java     |   1 +
 .../replication/ReplicationSerDeHelper.java     |   5 +
 .../protobuf/generated/ZooKeeperProtos.java     | 120 ++++++++++++++++---
 .../src/main/protobuf/ZooKeeper.proto           |   1 +
 .../regionserver/ReplicationSource.java         |  32 ++++-
 .../regionserver/ReplicationThrottler.java      |   9 +-
 .../replication/TestReplicationAdmin.java       |  17 +++
 .../src/main/ruby/hbase/replication_admin.rb    |   9 ++
 hbase-shell/src/main/ruby/shell.rb              |   1 +
 .../src/main/ruby/shell/commands/list_peers.rb  |   5 +-
 .../ruby/shell/commands/set_peer_bandwidth.rb   |  42 +++++++
 .../test/ruby/hbase/replication_admin_test.rb   |  18 +++
 15 files changed, 263 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3df0f9/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index 9a6af4f..200d81c 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -71,6 +71,12 @@ public interface ReplicationPeer {
   public Map<TableName, List<String>> getTableCFs();
 
   /**
+   * Get the per node bandwidth upper limit for this peer
+   * @return the bandwidth up limit
+   */
+  public long getPeerBandwidth();
+
+  /**
    * Setup a callback for chanages to the replication peer config
    * @param listener Listener for config changes, usually a replication 
endpoint
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3df0f9/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index e2c7bc7..eee521c 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -41,6 +41,7 @@ public class ReplicationPeerConfig {
   private final Map<byte[], byte[]> peerData;
   private final Map<String, String> configuration;
   private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
+  private long bandwidth = 0;
 
   public ReplicationPeerConfig() {
     this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
@@ -89,13 +90,23 @@ public class ReplicationPeerConfig {
     this.tableCFsMap = tableCFsMap;
   }
 
+  public long getBandwidth() {
+    return this.bandwidth;
+  }
+
+  public ReplicationPeerConfig setBandwidth(long bandwidth) {
+    this.bandwidth = bandwidth;
+    return this;
+  }
+
   @Override
   public String toString() {
     StringBuilder builder = new 
StringBuilder("clusterKey=").append(clusterKey).append(",");
     builder.append("replicationEndpointImpl=").append(replicationEndpointImpl);
     if (tableCFsMap != null) {
-      builder.append(tableCFsMap.toString());
+      builder.append(tableCFsMap.toString()).append(",");
     }
+    builder.append("bandwidth=").append(bandwidth);
     return builder.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3df0f9/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
index 049a142..b79a982 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -163,6 +163,12 @@ public class ReplicationPeerZKImpl extends 
ReplicationStateZKBase implements Rep
     return this.tableCFs;
   }
 
+
+  @Override
+  public long getPeerBandwidth() {
+    return this.peerConfig.getBandwidth();
+  }
+
   @Override
   public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
     if (this.peerConfigTracker != null){

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3df0f9/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 331e8bf..9e3c92e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -375,6 +375,7 @@ public class ReplicationPeersZKImpl extends 
ReplicationStateZKBase implements Re
     // or data that weren't explicitly changed
     existingConfig.getConfiguration().putAll(newConfig.getConfiguration());
     existingConfig.getPeerData().putAll(newConfig.getPeerData());
+    existingConfig.setBandwidth(newConfig.getBandwidth());
     try {
       ZKUtil.setData(this.zookeeper, getPeerNode(id),
           ReplicationSerDeHelper.toByteArray(existingConfig));

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3df0f9/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java
index cdb95f7f..ae511e8 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java
@@ -264,6 +264,10 @@ public final class ReplicationSerDeHelper {
     if (tableCFsMap != null) {
       peerConfig.setTableCFsMap(tableCFsMap);
     }
+
+    if (peer.hasBandwidth()) {
+      peerConfig.setBandwidth(peer.getBandwidth());
+    }
     return peerConfig;
   }
 
@@ -308,6 +312,7 @@ public final class ReplicationSerDeHelper {
       }
     }
 
+    builder.setBandwidth(peerConfig.getBandwidth());
     return builder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3df0f9/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
----------------------------------------------------------------------
diff --git 
a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
 
b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
index 955995f..fb06a78 100644
--- 
a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
+++ 
b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
@@ -5867,6 +5867,16 @@ public final class ZooKeeperProtos {
      */
     
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder 
getTableCfsOrBuilder(
         int index);
+
+    // optional int64 bandwidth = 6;
+    /**
+     * <code>optional int64 bandwidth = 6;</code>
+     */
+    boolean hasBandwidth();
+    /**
+     * <code>optional int64 bandwidth = 6;</code>
+     */
+    long getBandwidth();
   }
   /**
    * Protobuf type {@code hbase.pb.ReplicationPeer}
@@ -5958,6 +5968,11 @@ public final class ZooKeeperProtos {
               
tableCfs_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.PARSER,
 extensionRegistry));
               break;
             }
+            case 48: {
+              bitField0_ |= 0x00000004;
+              bandwidth_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -6216,12 +6231,29 @@ public final class ZooKeeperProtos {
       return tableCfs_.get(index);
     }
 
+    // optional int64 bandwidth = 6;
+    public static final int BANDWIDTH_FIELD_NUMBER = 6;
+    private long bandwidth_;
+    /**
+     * <code>optional int64 bandwidth = 6;</code>
+     */
+    public boolean hasBandwidth() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional int64 bandwidth = 6;</code>
+     */
+    public long getBandwidth() {
+      return bandwidth_;
+    }
+
     private void initFields() {
       clusterkey_ = "";
       replicationEndpointImpl_ = "";
       data_ = java.util.Collections.emptyList();
       configuration_ = java.util.Collections.emptyList();
       tableCfs_ = java.util.Collections.emptyList();
+      bandwidth_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -6272,6 +6304,9 @@ public final class ZooKeeperProtos {
       for (int i = 0; i < tableCfs_.size(); i++) {
         output.writeMessage(5, tableCfs_.get(i));
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeInt64(6, bandwidth_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -6301,6 +6336,10 @@ public final class ZooKeeperProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(5, tableCfs_.get(i));
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(6, bandwidth_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -6340,6 +6379,11 @@ public final class ZooKeeperProtos {
           .equals(other.getConfigurationList());
       result = result && getTableCfsList()
           .equals(other.getTableCfsList());
+      result = result && (hasBandwidth() == other.hasBandwidth());
+      if (hasBandwidth()) {
+        result = result && (getBandwidth()
+            == other.getBandwidth());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -6373,6 +6417,10 @@ public final class ZooKeeperProtos {
         hash = (37 * hash) + TABLE_CFS_FIELD_NUMBER;
         hash = (53 * hash) + getTableCfsList().hashCode();
       }
+      if (hasBandwidth()) {
+        hash = (37 * hash) + BANDWIDTH_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getBandwidth());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -6512,6 +6560,8 @@ public final class ZooKeeperProtos {
         } else {
           tableCfsBuilder_.clear();
         }
+        bandwidth_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
 
@@ -6575,6 +6625,10 @@ public final class ZooKeeperProtos {
         } else {
           result.tableCfs_ = tableCfsBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.bandwidth_ = bandwidth_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -6679,6 +6733,9 @@ public final class ZooKeeperProtos {
             }
           }
         }
+        if (other.hasBandwidth()) {
+          setBandwidth(other.getBandwidth());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -7626,6 +7683,39 @@ public final class ZooKeeperProtos {
         return tableCfsBuilder_;
       }
 
+      // optional int64 bandwidth = 6;
+      private long bandwidth_ ;
+      /**
+       * <code>optional int64 bandwidth = 6;</code>
+       */
+      public boolean hasBandwidth() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      /**
+       * <code>optional int64 bandwidth = 6;</code>
+       */
+      public long getBandwidth() {
+        return bandwidth_;
+      }
+      /**
+       * <code>optional int64 bandwidth = 6;</code>
+       */
+      public Builder setBandwidth(long value) {
+        bitField0_ |= 0x00000020;
+        bandwidth_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 bandwidth = 6;</code>
+       */
+      public Builder clearBandwidth() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        bandwidth_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:hbase.pb.ReplicationPeer)
     }
 
@@ -10915,24 +11005,24 @@ public final class ZooKeeperProtos {
       
"ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISABLE" +
       
"D\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"D\n\007Tabl"
 +
       "eCF\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Table" +
-      "Name\022\020\n\010families\030\002 
\003(\014\"\305\001\n\017ReplicationPe" +
+      "Name\022\020\n\010families\030\002 
\003(\014\"\330\001\n\017ReplicationPe" +
       "er\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replicationEnd" +
       "pointImpl\030\002 \001(\t\022&\n\004data\030\003 
\003(\0132\030.hbase.pb" +
       ".BytesBytesPair\022/\n\rconfiguration\030\004 \003(\0132\030" +
       ".hbase.pb.NameStringPair\022$\n\ttable_cfs\030\005 " +
-      "\003(\0132\021.hbase.pb.TableCF\"g\n\020ReplicationSta" +
-      "te\022/\n\005state\030\001 \002(\0162 .hbase.pb.Replication",
-      
"State.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DIS" +
-      "ABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010po" +
-      "sition\030\001 \002(\003\"%\n\017ReplicationLock\022\022\n\nlock_" +
-      "owner\030\001 \002(\t\"\252\001\n\tTableLock\022\'\n\ntable_name\030" +
-      "\001 \001(\0132\023.hbase.pb.TableName\022(\n\nlock_owner" +
-      "\030\002 \001(\0132\024.hbase.pb.ServerName\022\021\n\tthread_i" +
-      "d\030\003 \001(\003\022\021\n\tis_shared\030\004 
\001(\010\022\017\n\007purpose\030\005 " +
-      "\001(\t\022\023\n\013create_time\030\006 
\001(\003\"\036\n\013SwitchState\022" +
-      "\017\n\007enabled\030\001 \001(\010BE\n*org.apache.hadoop.hb" +
-      "ase.protobuf.generatedB\017ZooKeeperProtosH",
-      "\001\210\001\001\240\001\001"
+      "\003(\0132\021.hbase.pb.TableCF\022\021\n\tbandwidth\030\006 \001(" +
+      "\003\"g\n\020ReplicationState\022/\n\005state\030\001 \002(\0162 .h",
+      "base.pb.ReplicationState.State\"\"\n\005State\022" +
+      
"\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027Replicatio" +
+      "nHLogPosition\022\020\n\010position\030\001 \002(\003\"%\n\017Repli" +
+      "cationLock\022\022\n\nlock_owner\030\001 \002(\t\"\252\001\n\tTable" +
+      "Lock\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Tabl" +
+      "eName\022(\n\nlock_owner\030\002 \001(\0132\024.hbase.pb.Ser" +
+      "verName\022\021\n\tthread_id\030\003 
\001(\003\022\021\n\tis_shared\030" +
+      "\004 \001(\010\022\017\n\007purpose\030\005 
\001(\t\022\023\n\013create_time\030\006 " +
+      "\001(\003\"\036\n\013SwitchState\022\017\n\007enabled\030\001 
\001(\010BE\n*o" +
+      "rg.apache.hadoop.hbase.protobuf.generate",
+      "dB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -10986,7 +11076,7 @@ public final class ZooKeeperProtos {
           internal_static_hbase_pb_ReplicationPeer_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_ReplicationPeer_descriptor,
-              new java.lang.String[] { "Clusterkey", 
"ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", });
+              new java.lang.String[] { "Clusterkey", 
"ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Bandwidth", });
           internal_static_hbase_pb_ReplicationState_descriptor =
             getDescriptor().getMessageTypes().get(8);
           internal_static_hbase_pb_ReplicationState_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3df0f9/hbase-protocol/src/main/protobuf/ZooKeeper.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/ZooKeeper.proto 
b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
index 60ed229..a632552 100644
--- a/hbase-protocol/src/main/protobuf/ZooKeeper.proto
+++ b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
@@ -135,6 +135,7 @@ message ReplicationPeer {
   repeated BytesBytesPair data = 3;
   repeated NameStringPair configuration = 4;
   repeated TableCF table_cfs = 5;
+  optional int64 bandwidth = 6;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3df0f9/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 7fd7d94..63549d0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -144,6 +145,8 @@ public class ReplicationSource extends Thread
   private WALEntryFilter walEntryFilter;
   // throttler
   private ReplicationThrottler throttler;
+  private long defaultBandwidth;
+  private long currentBandwidth;
   private ConcurrentHashMap<String, ReplicationSourceWorkerThread> 
workerThreads =
       new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
 
@@ -179,8 +182,6 @@ public class ReplicationSource extends Thread
     this.maxRetriesMultiplier =
         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 
minutes @ 1 sec per
     this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 
32);
-    long bandwidth = 
this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
-    this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
     this.replicationQueues = replicationQueues;
     this.replicationPeers = replicationPeers;
     this.manager = manager;
@@ -196,6 +197,15 @@ public class ReplicationSource extends Thread
     this.actualPeerId = replicationQueueInfo.getPeerId();
     this.logQueueWarnThreshold = 
this.conf.getInt("replication.source.log.queue.warn", 2);
     this.replicationEndpoint = replicationEndpoint;
+
+    defaultBandwidth = 
this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
+    currentBandwidth = getCurrentBandwidth();
+    this.throttler = new ReplicationThrottler((double) currentBandwidth / 
10.0);
+
+    LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " 
+ peerId
+        + " inited, replicationQueueSizeCapacity=" + 
replicationQueueSizeCapacity
+        + ", replicationQueueNbCapacity=" + replicationQueueNbCapacity + ", 
curerntBandwidth="
+        + this.currentBandwidth);
   }
 
   private void decorateConf() {
@@ -494,6 +504,13 @@ public class ReplicationSource extends Thread
     return this.metrics;
   }
 
+  private long getCurrentBandwidth() {
+    ReplicationPeer replicationPeer = this.replicationPeers.getPeer(peerId);
+    long peerBandwidth = replicationPeer != null ? 
replicationPeer.getPeerBandwidth() : 0;
+    // user can set peer bandwidth to 0 to use default bandwidth
+    return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
+  }
+
   public class ReplicationSourceWorkerThread extends Thread {
     private ReplicationSource source;
     private String walGroupId;
@@ -1087,6 +1104,16 @@ public class ReplicationSource extends Thread
       return distinctRowKeys + totalHFileEntries;
     }
 
+    private void checkBandwidthChangeAndResetThrottler() {
+      long peerBandwidth = getCurrentBandwidth();
+      if (peerBandwidth != currentBandwidth) {
+        currentBandwidth = peerBandwidth;
+        throttler.setBandwidth((double) currentBandwidth / 10.0);
+        LOG.info("ReplicationSource : " + peerId
+            + " bandwidth throttling changed, currentBandWidth=" + 
currentBandwidth);
+      }
+    }
+
     /**
      * Do the shipping logic
      * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
@@ -1101,6 +1128,7 @@ public class ReplicationSource extends Thread
       }
       while (isWorkerActive()) {
         try {
+          checkBandwidthChangeAndResetThrottler();
           if (throttler.isEnabled()) {
             long sleepTicks = throttler.getNextSleepInterval(currentSize);
             if (sleepTicks > 0) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3df0f9/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
index c756576..8da9352 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java
@@ -27,8 +27,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  */
 @InterfaceAudience.Private
 public class ReplicationThrottler {
-  private final boolean enabled;
-  private final double bandwidth;
+  private boolean enabled;
+  private double bandwidth;
   private long cyclePushSize;
   private long cycleStartTick;
 
@@ -118,4 +118,9 @@ public class ReplicationThrottler {
       this.cycleStartTick = EnvironmentEdgeManager.currentTime();
     }
   }
+
+  public void setBandwidth(double bandwidth) {
+    this.bandwidth = bandwidth;
+    this.enabled = this.bandwidth > 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3df0f9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index b5627c0..cf7b236 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -363,4 +363,21 @@ public class TestReplicationAdmin {
     assertNull(admin.getPeerConfig(ID_ONE).getTableCFsMap());
     admin.removePeer(ID_ONE);
   }
+
+  @Test
+  public void testPeerBandwidth() throws ReplicationException {
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(KEY_ONE);
+    admin.addPeer(ID_ONE, rpc);
+    admin.peerAdded(ID_ONE);
+
+    rpc = admin.getPeerConfig(ID_ONE);
+    assertEquals(0, rpc.getBandwidth());
+
+    rpc.setBandwidth(2097152);
+    admin.updatePeerConfig(ID_ONE, rpc);
+
+    assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth());
+    admin.removePeer(ID_ONE);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3df0f9/hbase-shell/src/main/ruby/hbase/replication_admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb 
b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
index 2e240e1..f0da3ae 100644
--- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
@@ -182,6 +182,15 @@ module Hbase
       @replication_admin.removePeerTableCFs(id, map)
     end
 
+    # Set new bandwidth config for the specified peer
+    def set_peer_bandwidth(id, bandwidth)
+      rpc = get_peer_config(id)
+      unless rpc.nil?
+        rpc.setBandwidth(bandwidth)
+        @replication_admin.updatePeerConfig(id, rpc)
+      end
+    end
+
     
#----------------------------------------------------------------------------------------------
     # Enables a table's replication switch
     def enable_tablerep(table_name)

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3df0f9/hbase-shell/src/main/ruby/shell.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell.rb 
b/hbase-shell/src/main/ruby/shell.rb
index 963e369..f94e334 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -359,6 +359,7 @@ Shell.load_command_group(
     disable_peer
     show_peer_tableCFs
     set_peer_tableCFs
+    set_peer_bandwidth
     list_replicated_tables
     append_peer_tableCFs
     remove_peer_tableCFs

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3df0f9/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb 
b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
index 6444c79..82b5237 100644
--- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
@@ -34,13 +34,14 @@ EOF
         peers = replication_admin.list_peers
 
         formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME",
-          "STATE", "TABLE_CFS"])
+          "STATE", "TABLE_CFS", "BANDWIDTH"])
 
         peers.entrySet().each do |e|
           state = replication_admin.get_peer_state(e.key)
           tableCFs = replication_admin.show_peer_tableCFs(e.key)
           formatter.row([ e.key, e.value.getClusterKey,
-            e.value.getReplicationEndpointImpl, state, tableCFs ])
+            e.value.getReplicationEndpointImpl, state, tableCFs,
+            e.value.getBandwidth ])
         end
 
         formatter.footer(now)

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3df0f9/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb 
b/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb
new file mode 100644
index 0000000..d9495af
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_bandwidth.rb
@@ -0,0 +1,42 @@
+#
+# Copyright The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+  module Commands
+    class SetPeerBandwidth< Command
+      def help
+        return <<-EOF
+Set the replication source per node bandwidth for the specified peer.
+Examples:
+
+  # set bandwidth=2MB per regionserver for a peer
+  hbase> set_peer_bandwidth '1', 2097152
+  # unset bandwidth for a peer to use the default bandwidth configured in 
server-side
+  hbase> set_peer_bandwidth '1'
+
+EOF
+      end
+
+      def command(id, bandwidth = 0)
+        replication_admin.set_peer_bandwidth(id, bandwidth)
+      end
+    end
+  end
+end

http://git-wip-us.apache.org/repos/asf/hbase/blob/4b3df0f9/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb 
b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
index 84bdf56..b73739b 100644
--- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
@@ -198,6 +198,24 @@ module Hbase
       replication_admin.remove_peer(@peer_id)
     end
 
+    define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do
+      cluster_key = "localhost:2181:/hbase-test"
+      args = { CLUSTER_KEY => cluster_key }
+      replication_admin.add_peer(@peer_id, args)
+      # Normally the ReplicationSourceManager will call 
ReplicationPeer#peer_added
+      # but here we have to do it ourselves
+      replication_admin.peer_added(@peer_id)
+
+      peer_config = replication_admin.get_peer_config(@peer_id)
+      assert_equal(0, peer_config.get_bandwidth)
+      replication_admin.set_peer_bandwidth(@peer_id, 2097152)
+      peer_config = replication_admin.get_peer_config(@peer_id)
+      assert_equal(2097152, peer_config.get_bandwidth)
+
+      #cleanup
+      replication_admin.remove_peer(@peer_id)
+    end
+
     define_test "get_peer_config: works with simple clusterKey peer" do
       cluster_key = "localhost:2181:/hbase-test"
       args = { CLUSTER_KEY => cluster_key }

Reply via email to