HBASE-19635 Introduce a thread at RS side to call reportProcedureDone

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4d5bec43
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4d5bec43
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4d5bec43

Branch: refs/heads/HBASE-19397-branch-2
Commit: 4d5bec4366a4648aec7db547657720876ac129fd
Parents: e807fd5
Author: zhangduo <zhang...@apache.org>
Authored: Wed Dec 27 20:13:42 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Tue Jan 30 09:24:47 2018 +0800

----------------------------------------------------------------------
 .../src/main/protobuf/RegionServerStatus.proto  |   5 +-
 .../hadoop/hbase/master/MasterRpcServices.java  |  15 ++-
 .../hbase/regionserver/HRegionServer.java       |  72 ++++--------
 .../RemoteProcedureResultReporter.java          | 111 +++++++++++++++++++
 .../handler/RSProcedureHandler.java             |   2 +-
 5 files changed, 149 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4d5bec43/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto 
b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
index 4f75941..3f836cd 100644
--- a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
@@ -146,7 +146,7 @@ message RegionSpaceUseReportRequest {
 message RegionSpaceUseReportResponse {
 }
 
-message ReportProcedureDoneRequest {
+message RemoteProcedureResult {
   required uint64 proc_id = 1;
   enum Status {
     SUCCESS = 1;
@@ -155,6 +155,9 @@ message ReportProcedureDoneRequest {
   required Status status = 2;
   optional ForeignExceptionMessage error = 3;
 }
+message ReportProcedureDoneRequest {
+  repeated RemoteProcedureResult result = 1;
+}
 
 message ReportProcedureDoneResponse {
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4d5bec43/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 72bf2d1..377a9c6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -265,6 +265,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportResponse;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
@@ -2254,12 +2255,14 @@ public class MasterRpcServices extends RSRpcServices
   @Override
   public ReportProcedureDoneResponse reportProcedureDone(RpcController 
controller,
       ReportProcedureDoneRequest request) throws ServiceException {
-    if (request.getStatus() == ReportProcedureDoneRequest.Status.SUCCESS) {
-      master.remoteProcedureCompleted(request.getProcId());
-    } else {
-      master.remoteProcedureFailed(request.getProcId(),
-        RemoteProcedureException.fromProto(request.getError()));
-    }
+    request.getResultList().forEach(result -> {
+      if (result.getStatus() == RemoteProcedureResult.Status.SUCCESS) {
+        master.remoteProcedureCompleted(result.getProcId());
+      } else {
+        master.remoteProcedureFailed(result.getProcId(),
+          RemoteProcedureException.fromProto(result.getError()));
+      }
+    });
     return ReportProcedureDoneResponse.getDefaultInstance();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4d5bec43/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 3c8ec17..3844415 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -146,7 +146,6 @@ import org.apache.hadoop.hbase.util.CompressionTest;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
 import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
@@ -382,6 +381,9 @@ public class HRegionServer extends HasThread implements
   // eclipse warning when accessed by inner classes
   protected LogRoller walRoller;
 
+  // A thread which calls reportProcedureDone
+  private RemoteProcedureResultReporter procedureResultReporter;
+
   // flag set after we're done setting up server threads
   final AtomicBoolean online = new AtomicBoolean(false);
 
@@ -1887,6 +1889,7 @@ public class HRegionServer extends HasThread implements
 
     this.walRoller = new LogRoller(this, this);
     this.flushThroughputController = 
FlushThroughputControllerFactory.create(this, conf);
+    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
 
     // Create the CompactedFileDischarger chore executorService. This chore 
helps to
     // remove the compacted files
@@ -1930,6 +1933,8 @@ public class HRegionServer extends HasThread implements
     Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + 
".logRoller",
     uncaughtExceptionHandler);
     this.cacheFlusher.start(uncaughtExceptionHandler);
+    Threads.setDaemonThreadRunning(this.procedureResultReporter,
+      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
 
     if (this.compactionChecker != null) 
choreService.scheduleChore(compactionChecker);
     if (this.periodicFlusher != null) 
choreService.scheduleChore(periodicFlusher);
@@ -3712,55 +3717,26 @@ public class HRegionServer extends HasThread implements
     executorService.submit(new RSProcedureHandler(this, procId, callable));
   }
 
-  public void reportProcedureDone(long procId, Throwable error) {
-    ReportProcedureDoneRequest.Builder builder =
-      ReportProcedureDoneRequest.newBuilder().setProcId(procId);
-    if (error != null) {
-      builder.setStatus(ReportProcedureDoneRequest.Status.ERROR)
-          
.setError(ForeignExceptionUtil.toProtoForeignException(serverName.toString(), 
error));
-    } else {
-      builder.setStatus(ReportProcedureDoneRequest.Status.SUCCESS);
+  public void remoteProcedureComplete(long procId, Throwable error) {
+    procedureResultReporter.complete(procId, error);
+  }
+
+  void reportProcedureDone(ReportProcedureDoneRequest request) throws 
IOException {
+    RegionServerStatusService.BlockingInterface rss = rssStub;
+    for (;;) {
+      rss = rssStub;
+      if (rss != null) {
+        break;
+      }
+      createRegionServerStatusStub();
     }
-    ReportProcedureDoneRequest request = builder.build();
-    int tries = 0;
-    long pauseTime = INIT_PAUSE_TIME_MS;
-    while (keepLooping()) {
-      RegionServerStatusService.BlockingInterface rss = rssStub;
-      try {
-        if (rss == null) {
-          createRegionServerStatusStub();
-          continue;
-        }
-        rss.reportProcedureDone(null, request);
-        // Log if we had to retry else don't log unless TRACE. We want to
-        // know if were successful after an attempt showed in logs as failed.
-        if (tries > 0 || LOG.isTraceEnabled()) {
-          LOG.info("PROCEDURE REPORTED " + request);
-        }
-        return;
-      } catch (ServiceException se) {
-        IOException ioe = ProtobufUtil.getRemoteException(se);
-        boolean pause =
-          ioe instanceof ServerNotRunningYetException || ioe instanceof 
PleaseHoldException;
-        if (pause) {
-          // Do backoff else we flood the Master with requests.
-          pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries);
-        } else {
-          pauseTime = INIT_PAUSE_TIME_MS; // Reset.
-        }
-        LOG.info(
-          "Failed to report transition " + 
TextFormat.shortDebugString(request) + "; retry (#" +
-            tries + ")" + (pause ? " after " + pauseTime + "ms delay (Master 
is coming online...)."
-              : " immediately."),
-          ioe);
-        if (pause) {
-          Threads.sleep(pauseTime);
-        }
-        tries++;
-        if (rssStub == rss) {
-          rssStub = null;
-        }
+    try {
+      rss.reportProcedureDone(null, request);
+    } catch (ServiceException se) {
+      if (rssStub == rss) {
+        rssStub = null;
       }
+      throw ProtobufUtil.getRemoteException(se);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4d5bec43/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
new file mode 100644
index 0000000..e4be422
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.hadoop.hbase.PleaseHoldException;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
+
+/**
+ * A thread which calls {@code reportProcedureDone} to tell master the result 
of a remote procedure.
+ */
+@InterfaceAudience.Private
+class RemoteProcedureResultReporter extends Thread {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RemoteProcedureResultReporter.class);
+
+  // Time to pause if master says 'please hold'. Make configurable if needed.
+  private static final int INIT_PAUSE_TIME_MS = 1000;
+
+  private static final int MAX_BATCH = 100;
+
+  private final HRegionServer server;
+
+  private final LinkedBlockingQueue<RemoteProcedureResult> results = new 
LinkedBlockingQueue<>();
+
+  public RemoteProcedureResultReporter(HRegionServer server) {
+    this.server = server;
+  }
+
+  public void complete(long procId, Throwable error) {
+    RemoteProcedureResult.Builder builder = 
RemoteProcedureResult.newBuilder().setProcId(procId);
+    if (error != null) {
+      builder.setStatus(RemoteProcedureResult.Status.ERROR).setError(
+        
ForeignExceptionUtil.toProtoForeignException(server.getServerName().toString(), 
error));
+    } else {
+      builder.setStatus(RemoteProcedureResult.Status.SUCCESS);
+    }
+    results.add(builder.build());
+  }
+
+  @Override
+  public void run() {
+    ReportProcedureDoneRequest.Builder builder = 
ReportProcedureDoneRequest.newBuilder();
+    int tries = 0;
+    while (!server.isStopped()) {
+      if (builder.getResultCount() == 0) {
+        try {
+          builder.addResult(results.take());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          continue;
+        }
+      }
+      while (builder.getResultCount() < MAX_BATCH) {
+        RemoteProcedureResult result = results.poll();
+        if (result == null) {
+          break;
+        }
+        builder.addResult(result);
+      }
+      ReportProcedureDoneRequest request = builder.build();
+      try {
+        server.reportProcedureDone(builder.build());
+        builder.clear();
+        tries = 0;
+      } catch (IOException e) {
+        boolean pause =
+          e instanceof ServerNotRunningYetException || e instanceof 
PleaseHoldException;
+        long pauseTime;
+        if (pause) {
+          // Do backoff else we flood the Master with requests.
+          pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries);
+        } else {
+          pauseTime = INIT_PAUSE_TIME_MS; // Reset.
+        }
+        LOG.info("Failed report procedure " + 
TextFormat.shortDebugString(request) + "; retry (#" +
+          tries + ")" + (pause ? " after " + pauseTime + "ms delay (Master is 
coming online...)."
+            : " immediately."),
+          e);
+        Threads.sleep(pauseTime);
+        tries++;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/4d5bec43/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
index 240b0a7..d2175d0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
@@ -49,6 +49,6 @@ public class RSProcedureHandler extends EventHandler {
       LOG.error("Catch exception when call RSProcedureCallable: ", e);
       error = e;
     }
-    ((HRegionServer) server).reportProcedureDone(procId, error);
+    ((HRegionServer) server).remoteProcedureComplete(procId, error);
   }
 }

Reply via email to