Revert "HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base."
This reverts commit ed87a81b4b61c4842c12572a47c97ae23773012f. Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0206dc67 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0206dc67 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0206dc67 Branch: refs/heads/master Commit: 0206dc67d643e4a248a319c724cd6e58f0e77603 Parents: ed87a81 Author: stack <[email protected]> Authored: Fri Aug 5 15:18:48 2016 -0700 Committer: stack <[email protected]> Committed: Fri Aug 5 15:18:48 2016 -0700 ---------------------------------------------------------------------- .../client/AbstractRegionServerCallable.java | 23 +- .../hadoop/hbase/client/ClientScanner.java | 2 +- .../hbase/client/ClientSimpleScanner.java | 3 +- .../hadoop/hbase/client/ClientSmallScanner.java | 42 +- .../hadoop/hbase/client/ConnectionCallable.java | 56 + .../hbase/client/ConnectionImplementation.java | 40 +- .../hbase/client/FlushRegionCallable.java | 26 +- .../apache/hadoop/hbase/client/HBaseAdmin.java | 1110 ++++++++++-------- .../org/apache/hadoop/hbase/client/HTable.java | 455 ++++--- .../hadoop/hbase/client/MasterCallable.java | 37 +- .../hbase/client/MasterKeepAliveConnection.java | 3 +- .../hbase/client/MultiServerCallable.java | 35 +- .../client/PayloadCarryingServerCallable.java | 44 +- .../client/RegionAdminServiceCallable.java | 54 +- .../hbase/client/RegionServerCallable.java | 72 +- .../hbase/client/RetryingTimeTracker.java | 12 +- .../hbase/client/ReversedScannerCallable.java | 4 +- .../hbase/client/RpcRetryingCallable.java | 65 - .../hadoop/hbase/client/RpcRetryingCaller.java | 5 +- .../hbase/client/RpcRetryingCallerFactory.java | 1 - .../RpcRetryingCallerWithReadReplicas.java | 26 +- .../hadoop/hbase/client/ScannerCallable.java | 140 ++- .../hbase/client/SecureBulkLoadClient.java | 80 +- .../hbase/ipc/MasterCoprocessorRpcChannel.java | 3 +- .../hbase/ipc/PayloadCarryingRpcController.java | 139 +-- .../hbase/ipc/RegionCoprocessorRpcChannel.java | 23 +- .../hbase/ipc/TimeLimitedRpcController.java | 142 +++ .../hadoop/hbase/protobuf/ProtobufUtil.java | 73 +- .../hadoop/hbase/client/TestClientScanner.java | 1 + .../apache/hadoop/hbase/HBaseIOException.java | 3 +- .../apache/hadoop/hbase/util/ExceptionUtil.java | 2 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 6 +- .../hbase/mapreduce/LoadIncrementalHFiles.java | 36 +- .../master/ExpiredMobFileCleanerChore.java | 6 + .../hadoop/hbase/master/ServerManager.java | 5 +- .../hadoop/hbase/mob/ExpiredMobFileCleaner.java | 12 +- .../hadoop/hbase/mob/mapreduce/Sweeper.java | 6 +- .../hbase/regionserver/RSRpcServices.java | 15 +- .../regionserver/wal/WALEditsReplaySink.java | 43 +- .../RegionReplicaReplicationEndpoint.java | 54 +- .../org/apache/hadoop/hbase/util/Merge.java | 13 +- .../org/apache/hadoop/hbase/TestNamespace.java | 7 +- .../apache/hadoop/hbase/client/TestAdmin2.java | 8 +- .../hadoop/hbase/client/TestClientTimeouts.java | 7 +- .../org/apache/hadoop/hbase/client/TestHCM.java | 37 +- .../hbase/client/TestReplicaWithCluster.java | 35 +- .../regionserver/TestHRegionServerBulkLoad.java | 23 +- .../TestHRegionServerBulkLoadWithOldClient.java | 13 +- ...gionServerBulkLoadWithOldSecureEndpoint.java | 27 +- .../hbase/spark/SparkSQLPushDownFilter.java | 4 +- 50 files changed, 1630 insertions(+), 1448 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java index 5a1f5cc..7279d81 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRegionServerCallable.java @@ -18,7 +18,8 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; @@ -28,15 +29,26 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; /** - * Added by HBASE-15745 Refactor of RPC classes to better accept async changes. - * Temporary. + * Implementations call a RegionServer. + * Passed to a {@link RpcRetryingCaller} so we retry on fail. + * TODO: this class is actually tied to one region, because most of the paths make use of + * the regioninfo part of location when building requests. The only reason it works for + * multi-region requests (e.g. batch) is that they happen to not use the region parts. + * This could be done cleaner (e.g. having a generic parameter and 2 derived classes, + * RegionCallable and actual RegionServerCallable with ServerName. + * @param <T> the class that the ServerCallable handles */ @InterfaceAudience.Private abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> { + // Public because used outside of this package over in ipc. + private static final Log LOG = LogFactory.getLog(AbstractRegionServerCallable.class); + protected final Connection connection; protected final TableName tableName; protected final byte[] row; + protected HRegionLocation location; + protected final static int MIN_WAIT_DEAD_SERVER = 10000; /** @@ -115,7 +127,8 @@ abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> { @Override public void prepare(final boolean reload) throws IOException { // check table state if this is a retry - if (reload && !tableName.equals(TableName.META_TABLE_NAME) && + if (reload && + !tableName.equals(TableName.META_TABLE_NAME) && getConnection().isTableDisabled(tableName)) { throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled."); } @@ -135,4 +148,4 @@ abstract class AbstractRegionServerCallable<T> implements RetryingCallable<T> { * @throws IOException When client could not be created */ abstract void setClientByServiceName(ServerName serviceName) throws IOException; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 3e676c7..cb4c714 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -847,4 +847,4 @@ public abstract class ClientScanner extends AbstractClientScanner { Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length); return Result.create(list, result.getExists(), result.isStale(), result.isPartial()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java index ecf083b..f886971 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService; */ @InterfaceAudience.Private public class ClientSimpleScanner extends ClientScanner { + public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name, ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, RpcControllerFactory rpcControllerFactory, ExecutorService pool, @@ -49,4 +50,4 @@ public class ClientSimpleScanner extends ClientScanner { public Result next() throws IOException { return nextWithSyncCache(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java index 429c4cf..f9bdd55 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java @@ -18,10 +18,8 @@ */ package org.apache.hadoop.hbase.client; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.concurrent.ExecutorService; - +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -31,15 +29,17 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.util.Bytes; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.ExecutorService; /** * Client scanner for small scan. Generally, only one RPC is called to fetch the @@ -185,7 +185,7 @@ public class ClientSmallScanner extends ClientSimpleScanner { } @Override - protected Result[] call(PayloadCarryingRpcController controller) throws Exception { + public Result[] call(int timeout) throws IOException { if (this.closed) return null; if (Thread.interrupted()) { throw new InterruptedIOException(); @@ -193,17 +193,25 @@ public class ClientSmallScanner extends ClientSimpleScanner { ScanRequest request = RequestConverter.buildScanRequest(getLocation() .getRegionInfo().getRegionName(), getScan(), getCaching(), true); ScanResponse response = null; - response = getStub().scan(controller, request); - Result[] results = ResponseConverter.getResults(controller.cellScanner(), response); - if (response.hasMoreResultsInRegion()) { - setHasMoreResultsContext(true); - setServerHasMoreResults(response.getMoreResultsInRegion()); - } else { - setHasMoreResultsContext(false); + controller = controllerFactory.newController(); + try { + controller.setPriority(getTableName()); + controller.setCallTimeout(timeout); + response = getStub().scan(controller, request); + Result[] results = ResponseConverter.getResults(controller.cellScanner(), + response); + if (response.hasMoreResultsInRegion()) { + setHasMoreResultsContext(true); + setServerHasMoreResults(response.getMoreResultsInRegion()); + } else { + setHasMoreResultsContext(false); + } + // We need to update result metrics since we are overriding call() + updateResultsMetrics(results); + return results; + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } - // We need to update result metrics since we are overriding call() - updateResultsMetrics(results); - return results; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java new file mode 100644 index 0000000..3f44927 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionCallable.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A RetryingCallable for generic connection operations. + * @param <V> return type + */ +abstract class ConnectionCallable<V> implements RetryingCallable<V>, Closeable { + protected Connection connection; + + public ConnectionCallable(final Connection connection) { + this.connection = connection; + } + + @Override + public void prepare(boolean reload) throws IOException { + } + + @Override + public void close() throws IOException { + } + + @Override + public void throwable(Throwable t, boolean retrying) { + } + + @Override + public String getExceptionMessageAdditionalDetail() { + return ""; + } + + @Override + public long sleep(long pause, int tries) { + return ConnectionUtils.getPauseTime(pause, tries); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 638050f..8dcda13 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -20,6 +20,11 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; @@ -63,7 +68,6 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; @@ -91,11 +95,6 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.BlockingRpcChannel; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; - /** * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces. * Encapsulates connection to zookeeper and regionservers. @@ -934,13 +933,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable { this.stub = null; } - boolean isMasterRunning() throws IOException { - MasterProtos.IsMasterRunningResponse response = null; - try { - response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); - } catch (Exception e) { - throw ProtobufUtil.handleRemoteException(e); - } + boolean isMasterRunning() throws ServiceException { + MasterProtos.IsMasterRunningResponse response = + this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); return response != null? response.getIsMasterRunning(): false; } } @@ -1063,14 +1058,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable { /** * Once setup, check it works by doing isMasterRunning check. */ - protected abstract void isMasterRunning() throws IOException; + protected abstract void isMasterRunning() throws ServiceException; /** * Create a stub. Try once only. It is not typed because there is no common type to * protobuf services nor their interfaces. Let the caller do appropriate casting. * @return A stub for master services. */ - private Object makeStubNoRetries() throws IOException, KeeperException { + private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException { ZooKeeperKeepAliveConnection zkw; try { zkw = getKeepAliveZooKeeperWatcher(); @@ -1110,7 +1105,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } /** - * Create a stub against the master. Retry if necessary. + * Create a stub against the master. Retry if necessary. * @return A stub to do <code>intf</code> against the master * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running */ @@ -1126,7 +1121,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable { exceptionCaught = e; } catch (KeeperException e) { exceptionCaught = e; + } catch (ServiceException e) { + exceptionCaught = e; } + throw new MasterNotRunningException(exceptionCaught); } else { throw new DoNotRetryIOException("Connection was closed while trying to get master"); @@ -1157,12 +1155,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } @Override - protected void isMasterRunning() throws IOException { - try { - this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); - } catch (Exception e) { - throw ProtobufUtil.handleRemoteException(e); - } + protected void isMasterRunning() throws ServiceException { + this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); } } @@ -1707,7 +1701,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable { // java.net.ConnectException but they're not declared. So we catch it... LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable()); return false; - } catch (IOException se) { + } catch (ServiceException se) { LOG.warn("Checking master connection", se); return false; } http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java index c7bf804..73bdb74 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java @@ -27,18 +27,23 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import com.google.protobuf.ServiceException; + /** * A Callable for flushRegion() RPC. */ @InterfaceAudience.Private public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionResponse> { + private static final Log LOG = LogFactory.getLog(FlushRegionCallable.class); + private final byte[] regionName; private final boolean writeFlushWalMarker; private boolean reload; @@ -59,14 +64,18 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR } @Override + public FlushRegionResponse call(int callTimeout) throws Exception { + return flushRegion(); + } + + @Override public void prepare(boolean reload) throws IOException { super.prepare(reload); this.reload = reload; } - @Override - protected FlushRegionResponse call(PayloadCarryingRpcController controller) throws Exception { - // Check whether we should still do the flush to this region. If the regions are changed due + private FlushRegionResponse flushRegion() throws IOException { + // check whether we should still do the flush to this region. If the regions are changed due // to splits or merges, etc return success if (!Bytes.equals(location.getRegionInfo().getRegionName(), regionName)) { if (!reload) { @@ -84,6 +93,13 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR FlushRegionRequest request = RequestConverter.buildFlushRegionRequest(regionName, writeFlushWalMarker); - return stub.flushRegion(controller, request); + + try { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + return stub.flushRegion(controller, request); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } -} \ No newline at end of file +}
