infraio commented on a change in pull request #2579:
URL: https://github.com/apache/hbase/pull/2579#discussion_r511741505



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
##########
@@ -388,4 +446,152 @@ protected ReplicationServerRpcServices 
createRpcServices() throws IOException {
   protected boolean setAbortRequested() {
     return abortRequested.compareAndSet(false, true);
   }
+
+  protected void tryReplicationServerReport(long reportStartTime, long 
reportEndTime)
+      throws IOException {
+    ReplicationServerStatusService.BlockingInterface rss = rssStub;
+    if (rss == null) {
+      createReplicationServerStatusStub(true);
+      rss = rssStub;
+      if (rss == null) {
+        return;
+      }
+    }
+    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, 
reportEndTime);
+    try {
+      RegionServerReportRequest.Builder request = RegionServerReportRequest
+          .newBuilder();
+      request.setServer(ProtobufUtil.toServerName(this.serverName));
+      request.setLoad(sl);
+      rss.replicationServerReport(null, request.build());
+    } catch (ServiceException se) {
+      IOException ioe = ProtobufUtil.getRemoteException(se);
+      if (ioe instanceof YouAreDeadException) {
+        // This will be caught and handled as a fatal error in run()
+        throw ioe;
+      }
+      if (rssStub == rss) {
+        rssStub = null;
+      }
+      // Couldn't connect to the master, get location from zk and reconnect
+      // Method blocks until new master is found or we are stopped
+      createReplicationServerStatusStub(true);
+    }
+  }
+
+  private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, 
long reportEndTime) {
+    long usedMemory = -1L;
+    long maxMemory = -1L;
+    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
+    if (usage != null) {
+      usedMemory = usage.getUsed();
+      maxMemory = usage.getMax();
+    }
+
+    ClusterStatusProtos.ServerLoad.Builder serverLoad = 
ClusterStatusProtos.ServerLoad.newBuilder();
+    serverLoad.setTotalNumberOfRequests(rpcServices.requestCount.sum());
+    serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024));
+    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
+
+    serverLoad.setReportStartTime(reportStartTime);
+    serverLoad.setReportEndTime(reportEndTime);
+
+    // for the replicationLoad purpose. Only need to get from one 
executorService
+    // either source or sink will get the same info
+    ReplicationSinkService sinks = getReplicationSinkService();
+
+    if (sinks != null) {
+      // always refresh first to get the latest value
+      ReplicationLoad rLoad = sinks.refreshAndGetReplicationLoad();
+      if (rLoad != null) {
+        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
+      }
+    }
+    return serverLoad.build();
+  }
+
+  /**
+   * Get the current master from ZooKeeper and open the RPC connection to it. 
To get a fresh
+   * connection, the current rssStub must be null. Method will block until a 
master is available.
+   * You can break from this block by requesting the server stop.
+   * @param refresh If true then master address will be read from ZK, 
otherwise use cached data
+   * @return master + port, or null if server has been stopped
+   */
+  protected synchronized ServerName createReplicationServerStatusStub(boolean 
refresh) {

Review comment:
       Ditto.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to