HBASE-19635 Introduce a thread at RS side to call reportProcedureDone
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4d5bec43 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4d5bec43 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4d5bec43 Branch: refs/heads/HBASE-19397-branch-2 Commit: 4d5bec4366a4648aec7db547657720876ac129fd Parents: e807fd5 Author: zhangduo <zhang...@apache.org> Authored: Wed Dec 27 20:13:42 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Tue Jan 30 09:24:47 2018 +0800 ---------------------------------------------------------------------- .../src/main/protobuf/RegionServerStatus.proto | 5 +- .../hadoop/hbase/master/MasterRpcServices.java | 15 ++- .../hbase/regionserver/HRegionServer.java | 72 ++++-------- .../RemoteProcedureResultReporter.java | 111 +++++++++++++++++++ .../handler/RSProcedureHandler.java | 2 +- 5 files changed, 149 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/4d5bec43/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto index 4f75941..3f836cd 100644 --- a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto @@ -146,7 +146,7 @@ message RegionSpaceUseReportRequest { message RegionSpaceUseReportResponse { } -message ReportProcedureDoneRequest { +message RemoteProcedureResult { required uint64 proc_id = 1; enum Status { SUCCESS = 1; @@ -155,6 +155,9 @@ message ReportProcedureDoneRequest { required Status status = 2; optional ForeignExceptionMessage error = 3; } +message ReportProcedureDoneRequest { + repeated RemoteProcedureResult result = 1; +} message ReportProcedureDoneResponse { } http://git-wip-us.apache.org/repos/asf/hbase/blob/4d5bec43/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 72bf2d1..377a9c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -265,6 +265,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; @@ -2254,12 +2255,14 @@ public class MasterRpcServices extends RSRpcServices @Override public ReportProcedureDoneResponse reportProcedureDone(RpcController controller, ReportProcedureDoneRequest request) throws ServiceException { - if (request.getStatus() == ReportProcedureDoneRequest.Status.SUCCESS) { - master.remoteProcedureCompleted(request.getProcId()); - } else { - master.remoteProcedureFailed(request.getProcId(), - RemoteProcedureException.fromProto(request.getError())); - } + request.getResultList().forEach(result -> { + if (result.getStatus() == RemoteProcedureResult.Status.SUCCESS) { + master.remoteProcedureCompleted(result.getProcId()); + } else { + master.remoteProcedureFailed(result.getProcId(), + RemoteProcedureException.fromProto(result.getError())); + } + }); return ReportProcedureDoneResponse.getDefaultInstance(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/4d5bec43/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 3c8ec17..3844415 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -146,7 +146,6 @@ import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.JvmPauseMonitor; import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; @@ -382,6 +381,9 @@ public class HRegionServer extends HasThread implements // eclipse warning when accessed by inner classes protected LogRoller walRoller; + // A thread which calls reportProcedureDone + private RemoteProcedureResultReporter procedureResultReporter; + // flag set after we're done setting up server threads final AtomicBoolean online = new AtomicBoolean(false); @@ -1887,6 +1889,7 @@ public class HRegionServer extends HasThread implements this.walRoller = new LogRoller(this, this); this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); + this.procedureResultReporter = new RemoteProcedureResultReporter(this); // Create the CompactedFileDischarger chore executorService. This chore helps to // remove the compacted files @@ -1930,6 +1933,8 @@ public class HRegionServer extends HasThread implements Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", uncaughtExceptionHandler); this.cacheFlusher.start(uncaughtExceptionHandler); + Threads.setDaemonThreadRunning(this.procedureResultReporter, + getName() + ".procedureResultReporter", uncaughtExceptionHandler); if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker); if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher); @@ -3712,55 +3717,26 @@ public class HRegionServer extends HasThread implements executorService.submit(new RSProcedureHandler(this, procId, callable)); } - public void reportProcedureDone(long procId, Throwable error) { - ReportProcedureDoneRequest.Builder builder = - ReportProcedureDoneRequest.newBuilder().setProcId(procId); - if (error != null) { - builder.setStatus(ReportProcedureDoneRequest.Status.ERROR) - .setError(ForeignExceptionUtil.toProtoForeignException(serverName.toString(), error)); - } else { - builder.setStatus(ReportProcedureDoneRequest.Status.SUCCESS); + public void remoteProcedureComplete(long procId, Throwable error) { + procedureResultReporter.complete(procId, error); + } + + void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { + RegionServerStatusService.BlockingInterface rss = rssStub; + for (;;) { + rss = rssStub; + if (rss != null) { + break; + } + createRegionServerStatusStub(); } - ReportProcedureDoneRequest request = builder.build(); - int tries = 0; - long pauseTime = INIT_PAUSE_TIME_MS; - while (keepLooping()) { - RegionServerStatusService.BlockingInterface rss = rssStub; - try { - if (rss == null) { - createRegionServerStatusStub(); - continue; - } - rss.reportProcedureDone(null, request); - // Log if we had to retry else don't log unless TRACE. We want to - // know if were successful after an attempt showed in logs as failed. - if (tries > 0 || LOG.isTraceEnabled()) { - LOG.info("PROCEDURE REPORTED " + request); - } - return; - } catch (ServiceException se) { - IOException ioe = ProtobufUtil.getRemoteException(se); - boolean pause = - ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException; - if (pause) { - // Do backoff else we flood the Master with requests. - pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries); - } else { - pauseTime = INIT_PAUSE_TIME_MS; // Reset. - } - LOG.info( - "Failed to report transition " + TextFormat.shortDebugString(request) + "; retry (#" + - tries + ")" + (pause ? " after " + pauseTime + "ms delay (Master is coming online...)." - : " immediately."), - ioe); - if (pause) { - Threads.sleep(pauseTime); - } - tries++; - if (rssStub == rss) { - rssStub = null; - } + try { + rss.reportProcedureDone(null, request); + } catch (ServiceException se) { + if (rssStub == rss) { + rssStub = null; } + throw ProtobufUtil.getRemoteException(se); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/4d5bec43/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java new file mode 100644 index 0000000..e4be422 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.hadoop.hbase.PleaseHoldException; +import org.apache.hadoop.hbase.client.ConnectionUtils; +import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; + +/** + * A thread which calls {@code reportProcedureDone} to tell master the result of a remote procedure. + */ +@InterfaceAudience.Private +class RemoteProcedureResultReporter extends Thread { + + private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureResultReporter.class); + + // Time to pause if master says 'please hold'. Make configurable if needed. + private static final int INIT_PAUSE_TIME_MS = 1000; + + private static final int MAX_BATCH = 100; + + private final HRegionServer server; + + private final LinkedBlockingQueue<RemoteProcedureResult> results = new LinkedBlockingQueue<>(); + + public RemoteProcedureResultReporter(HRegionServer server) { + this.server = server; + } + + public void complete(long procId, Throwable error) { + RemoteProcedureResult.Builder builder = RemoteProcedureResult.newBuilder().setProcId(procId); + if (error != null) { + builder.setStatus(RemoteProcedureResult.Status.ERROR).setError( + ForeignExceptionUtil.toProtoForeignException(server.getServerName().toString(), error)); + } else { + builder.setStatus(RemoteProcedureResult.Status.SUCCESS); + } + results.add(builder.build()); + } + + @Override + public void run() { + ReportProcedureDoneRequest.Builder builder = ReportProcedureDoneRequest.newBuilder(); + int tries = 0; + while (!server.isStopped()) { + if (builder.getResultCount() == 0) { + try { + builder.addResult(results.take()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + continue; + } + } + while (builder.getResultCount() < MAX_BATCH) { + RemoteProcedureResult result = results.poll(); + if (result == null) { + break; + } + builder.addResult(result); + } + ReportProcedureDoneRequest request = builder.build(); + try { + server.reportProcedureDone(builder.build()); + builder.clear(); + tries = 0; + } catch (IOException e) { + boolean pause = + e instanceof ServerNotRunningYetException || e instanceof PleaseHoldException; + long pauseTime; + if (pause) { + // Do backoff else we flood the Master with requests. + pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries); + } else { + pauseTime = INIT_PAUSE_TIME_MS; // Reset. + } + LOG.info("Failed report procedure " + TextFormat.shortDebugString(request) + "; retry (#" + + tries + ")" + (pause ? " after " + pauseTime + "ms delay (Master is coming online...)." + : " immediately."), + e); + Threads.sleep(pauseTime); + tries++; + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/4d5bec43/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java index 240b0a7..d2175d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java @@ -49,6 +49,6 @@ public class RSProcedureHandler extends EventHandler { LOG.error("Catch exception when call RSProcedureCallable: ", e); error = e; } - ((HRegionServer) server).reportProcedureDone(procId, error); + ((HRegionServer) server).remoteProcedureComplete(procId, error); } }