http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java deleted file mode 100644 index cf08ea9..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.ipc; - -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -@InterfaceAudience.Private -public class TimeLimitedRpcController implements RpcController { - - /** - * The time, in ms before the call should expire. - */ - protected volatile Integer callTimeout; - protected volatile boolean cancelled = false; - protected final AtomicReference<RpcCallback<Object>> cancellationCb = - new AtomicReference<>(null); - - protected final AtomicReference<RpcCallback<IOException>> failureCb = - new AtomicReference<>(null); - - private IOException exception; - - public int getCallTimeout() { - if (callTimeout != null) { - return callTimeout; - } else { - return 0; - } - } - - public void setCallTimeout(int callTimeout) { - this.callTimeout = callTimeout; - } - - public boolean hasCallTimeout(){ - return callTimeout != null; - } - - @Override - public String errorText() { - if (exception != null) { - return exception.getMessage(); - } else { - return null; - } - } - - /** - * For use in async rpc clients - * @return true if failed - */ - @Override - public boolean failed() { - return this.exception != null; - } - - @Override - public boolean isCanceled() { - return cancelled; - } - - @Override - public void notifyOnCancel(RpcCallback<Object> cancellationCb) { - this.cancellationCb.set(cancellationCb); - if (this.cancelled) { - cancellationCb.run(null); - } - } - - /** - * Notify a callback on error. - * For use in async rpc clients - * - * @param failureCb the callback to call on error - */ - public void notifyOnFail(RpcCallback<IOException> failureCb) { - this.failureCb.set(failureCb); - if (this.exception != null) { - failureCb.run(this.exception); - } - } - - @Override - public void reset() { - exception = null; - cancelled = false; - failureCb.set(null); - cancellationCb.set(null); - callTimeout = null; - } - - @Override - public void setFailed(String reason) { - this.exception = new IOException(reason); - if (this.failureCb.get() != null) { - this.failureCb.get().run(this.exception); - } - } - - /** - * Set failed with an exception to pass on. - * For use in async rpc clients - * - * @param e exception to set with - */ - public void setFailed(IOException e) { - this.exception = e; - if (this.failureCb.get() != null) { - this.failureCb.get().run(this.exception); - } - } - - @Override - public void startCancel() { - cancelled = true; - if (cancellationCb.get() != null) { - cancellationCb.get().run(null); - } - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 5ba0572..623acd5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.protobuf; +import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier +.RegionSpecifierType.REGION_NAME; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -38,14 +41,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NavigableSet; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; - -import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier -.RegionSpecifierType.REGION_NAME; - import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -124,8 +125,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.LiveServerInfo; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionInTransition; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad; -import org.apache.hadoop.hbase.protobuf.generated.FSProtos.HBaseVersionFileContent; import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos; +import org.apache.hadoop.hbase.protobuf.generated.FSProtos.HBaseVersionFileContent; import org.apache.hadoop.hbase.protobuf.generated.FilterProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair; @@ -171,11 +172,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.DynamicClassLoader; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.Methods; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import com.google.common.collect.ArrayListMultimap; @@ -334,17 +333,32 @@ public final class ProtobufUtil { * a new IOException that wraps the unexpected ServiceException. */ public static IOException getRemoteException(ServiceException se) { - Throwable e = se.getCause(); - if (e == null) { - return new IOException(se); + return makeIOExceptionOfException(se); + } + + /** + * Like {@link #getRemoteException(ServiceException)} but more generic, able to handle more than + * just {@link ServiceException}. Prefer this method to + * {@link #getRemoteException(ServiceException)} because trying to + * contain direct protobuf references. + * @param e + */ + public static IOException handleRemoteException(Exception e) { + return makeIOExceptionOfException(e); + } + + private static IOException makeIOExceptionOfException(Exception e) { + Throwable t = e; + if (e instanceof ServiceException) { + t = e.getCause(); } - if (ExceptionUtil.isInterrupt(e)) { - return ExceptionUtil.asInterrupt(e); + if (ExceptionUtil.isInterrupt(t)) { + return ExceptionUtil.asInterrupt(t); } - if (e instanceof RemoteException) { - e = ((RemoteException) e).unwrapRemoteException(); + if (t instanceof RemoteException) { + t = ((RemoteException)t).unwrapRemoteException(); } - return e instanceof IOException ? (IOException) e : new IOException(se); + return t instanceof IOException? (IOException)t: new HBaseIOException(t); } /** @@ -1252,7 +1266,6 @@ public final class ProtobufUtil { return toMutation(type, mutation, builder, HConstants.NO_NONCE); } - @SuppressWarnings("deprecation") public static MutationProto toMutation(final MutationType type, final Mutation mutation, MutationProto.Builder builder, long nonce) throws IOException { @@ -2658,13 +2671,11 @@ public final class ProtobufUtil { } } - @SuppressWarnings("deprecation") public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] family, List<Path> inputPaths, List<Path> outputPaths, Path storeDir) { return toCompactionDescriptor(info, null, family, inputPaths, outputPaths, storeDir); } - @SuppressWarnings("deprecation") public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] regionName, byte[] family, List<Path> inputPaths, List<Path> outputPaths, Path storeDir) { // compaction descriptor contains relative paths. @@ -3663,4 +3674,28 @@ public final class ProtobufUtil { return new RegionLoadStats(stats.getMemstoreLoad(), stats.getHeapOccupancy(), stats.getCompactionPressure()); } -} + + /** + * @param msg + * @return A String version of the passed in <code>msg</code> + */ + public static String toText(Message msg) { + return TextFormat.shortDebugString(msg); + } + + public static byte [] toBytes(ByteString bs) { + return bs.toByteArray(); + } + + /** + * Contain ServiceException inside here. Take a callable that is doing our pb rpc and run it. + * @throws IOException + */ + public static <T> T call(Callable<T> callable) throws IOException { + try { + return callable.call(); + } catch (Exception e) { + throw ProtobufUtil.handleRemoteException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index f083001..fd2a393 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -65,7 +65,6 @@ public class TestClientScanner { RpcControllerFactory controllerFactory; @Before - @SuppressWarnings("deprecation") public void setup() throws IOException { clusterConn = Mockito.mock(ClusterConnection.class); rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class); http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java index 9c3367e..edcbdc5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java @@ -45,4 +45,5 @@ public class HBaseIOException extends IOException { public HBaseIOException(Throwable cause) { super(cause); - }} + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java index 688b51a..7e6c5d6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java @@ -73,4 +73,4 @@ public class ExceptionUtil { private ExceptionUtil() { } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 73226aa..ec28315 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -75,20 +75,17 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.io.ByteBufferInputStream; +import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferPool; -import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -99,7 +96,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.hadoop.hbase.security.HBasePolicyProvider; http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 09dedec..8ddbe18 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -87,6 +87,8 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; @@ -102,7 +104,6 @@ import org.apache.hadoop.util.ToolRunner; /** * Tool to load the output of HFileOutputFormat into an existing table. - * @see #usage() */ @InterfaceAudience.Public @InterfaceStability.Stable @@ -130,11 +131,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private String bulkToken; private UserProvider userProvider; private int nrThreads; + private RpcControllerFactory rpcControllerFactory; private LoadIncrementalHFiles() {} public LoadIncrementalHFiles(Configuration conf) throws Exception { super(conf); + this.rpcControllerFactory = new RpcControllerFactory(conf); initialize(); } @@ -322,7 +325,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { // LQI queue does not need to be threadsafe -- all operations on this queue // happen in this thread Deque<LoadQueueItem> queue = new LinkedList<>(); - SecureBulkLoadClient secureClient = new SecureBulkLoadClient(table); + SecureBulkLoadClient secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); try { /* @@ -473,9 +476,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool { /** * Used by the replication sink to load the hfiles from the source cluster. It does the following, - * 1. {@link LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)} 2. - * {@link - * LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)} + * <ol> + * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li> + * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap) + * </li> + * </ol> * @param table Table to which these hfiles should be loaded to * @param conn Connection to use * @param queue {@link LoadQueueItem} has hfiles yet to be loaded @@ -776,27 +781,23 @@ public class LoadIncrementalHFiles extends Configured implements Tool { protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn, final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis) throws IOException { - final List<Pair<byte[], String>> famPaths = - new ArrayList<>(lqis.size()); + final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size()); for (LoadQueueItem lqi : lqis) { famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); } - - final RegionServerCallable<Boolean> svrCallable = - new RegionServerCallable<Boolean>(conn, tableName, first) { + final RegionServerCallable<Boolean> svrCallable = new RegionServerCallable<Boolean>(conn, + rpcControllerFactory, tableName, first) { @Override - public Boolean call(int callTimeout) throws Exception { + protected Boolean call(PayloadCarryingRpcController controller) throws Exception { SecureBulkLoadClient secureClient = null; boolean success = false; - try { LOG.debug("Going to connect to server " + getLocation() + " for row " + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths); byte[] regionName = getLocation().getRegionInfo().getRegionName(); try (Table table = conn.getTable(getTableName())) { - secureClient = new SecureBulkLoadClient(table); - success = - secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, + secureClient = new SecureBulkLoadClient(getConf(), table); + success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, assignSeqIds, fsDelegationToken.getUserToken(), bulkToken); } return success; @@ -1078,7 +1079,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { /** * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is - * used only when {@link SecureBulkLoadEndpoint} is configured in hbase.coprocessor.region.classes + * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes * property. This directory is used as a temporary directory where all files are initially * copied/moved from user given directory, set all the required file permissions and then from * their it is finally loaded into a table. This should be set only when, one would like to manage @@ -1088,5 +1089,4 @@ public class LoadIncrementalHFiles extends Configured implements Tool { public void setBulkToken(String stagingDir) { this.bulkToken = stagingDir; } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java index a21edcc..3261bd6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java @@ -35,8 +35,6 @@ import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; -import com.google.protobuf.ServiceException; - /** * The Class ExpiredMobFileCleanerChore for running cleaner regularly to remove the expired * mob files. @@ -86,10 +84,6 @@ public class ExpiredMobFileCleanerChore extends ScheduledChore { } catch (LockTimeoutException e) { LOG.info("Fail to acquire the lock because of timeout, maybe a" + " MobCompactor is running", e); - } catch (ServiceException e) { - LOG.error( - "Fail to clean the expired mob files for the column " + hcd.getNameAsString() - + " in the table " + htd.getNameAsString(), e); } catch (IOException e) { LOG.error( "Fail to clean the expired mob files for the column " + hcd.getNameAsString() http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 531883a..d7ba4f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo; -import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId; +import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; @@ -454,8 +454,7 @@ public class ServerManager { /** * Adds the onlineServers list. onlineServers should be locked. * @param serverName The remote servers name. - * @param sl - * @return Server load from the removed server, if any. + * @param s */ @VisibleForTesting void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) { http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java index 3c965cb..d4a54bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java @@ -41,8 +41,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import com.google.protobuf.ServiceException; - /** * The cleaner to delete the expired MOB files. */ @@ -60,11 +58,8 @@ public class ExpiredMobFileCleaner extends Configured implements Tool { * directory. * @param tableName The current table name. * @param family The current family. - * @throws ServiceException - * @throws IOException */ - public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family) - throws ServiceException, IOException { + public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family) throws IOException { Configuration conf = getConf(); TableName tn = TableName.valueOf(tableName); FileSystem fs = FileSystem.get(conf); @@ -99,7 +94,7 @@ public class ExpiredMobFileCleaner extends Configured implements Tool { String tableName = args[0]; String familyName = args[1]; TableName tn = TableName.valueOf(tableName); - HBaseAdmin.checkHBaseAvailable(getConf()); + HBaseAdmin.available(getConf()); Connection connection = ConnectionFactory.createConnection(getConf()); Admin admin = connection.getAdmin(); try { @@ -127,5 +122,4 @@ public class ExpiredMobFileCleaner extends Configured implements Tool { } } } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java index 8547c8c..c27e8ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java @@ -38,8 +38,6 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.zookeeper.KeeperException; -import com.google.protobuf.ServiceException; - /** * The sweep tool. It deletes the mob files that are not used and merges the small mob files to * bigger ones. Each run of this sweep tool only handles one column family. The runs on @@ -64,10 +62,10 @@ public class Sweeper extends Configured implements Tool { * @throws ServiceException */ int sweepFamily(String tableName, String familyName) throws IOException, InterruptedException, - ClassNotFoundException, KeeperException, ServiceException { + ClassNotFoundException, KeeperException { Configuration conf = getConf(); // make sure the target HBase exists. - HBaseAdmin.checkHBaseAvailable(conf); + HBaseAdmin.available(conf); Connection connection = ConnectionFactory.createConnection(getConf()); Admin admin = connection.getAdmin(); try { http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index d87ada4..fb9a605 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -87,7 +87,6 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController; import org.apache.hadoop.hbase.master.MasterRpcServices; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -2765,13 +2764,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler, timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; } - if (controller instanceof TimeLimitedRpcController) { - TimeLimitedRpcController timeLimitedRpcController = - (TimeLimitedRpcController)controller; - if (timeLimitedRpcController.getCallTimeout() > 0) { - timeLimitDelta = Math.min(timeLimitDelta, - timeLimitedRpcController.getCallTimeout()); + if (controller instanceof PayloadCarryingRpcController) { + PayloadCarryingRpcController pRpcController = + (PayloadCarryingRpcController)controller; + if (pRpcController.getCallTimeout() > 0) { + timeLimitDelta = Math.min(timeLimitDelta, pRpcController.getCallTimeout()); } + } else { + throw new UnsupportedOperationException("We only do " + + "PayloadCarryingRpcControllers! FIX IF A PROBLEM"); } // Use half of whichever timeout value was more restrictive... But don't allow // the time limit to be less than the allowable minimum (could cause an http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java index 3eb85bd..c71153d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import com.google.protobuf.ServiceException; - import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -44,7 +42,6 @@ import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; @@ -61,10 +58,8 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; */ @InterfaceAudience.Private public class WALEditsReplaySink { - private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class); private static final int MAX_BATCH_SIZE = 1024; - private final Configuration conf; private final ClusterConnection conn; private final TableName tableName; @@ -166,8 +161,8 @@ public class WALEditsReplaySink { try { RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null); ReplayServerCallable<ReplicateWALEntryResponse> callable = - new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc, - regionInfo, entries); + new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.rpcControllerFactory, + this.tableName, regionLoc, entries); factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout); } catch (IOException ie) { if (skipErrors) { @@ -184,31 +179,19 @@ public class WALEditsReplaySink { * @param <R> */ class ReplayServerCallable<R> extends RegionServerCallable<ReplicateWALEntryResponse> { - private HRegionInfo regionInfo; private List<Entry> entries; - ReplayServerCallable(final Connection connection, final TableName tableName, - final HRegionLocation regionLoc, final HRegionInfo regionInfo, - final List<Entry> entries) { - super(connection, tableName, null); + ReplayServerCallable(final Connection connection, RpcControllerFactory rpcControllerFactory, + final TableName tableName, final HRegionLocation regionLoc, final List<Entry> entries) { + super(connection, rpcControllerFactory, tableName, null); this.entries = entries; - this.regionInfo = regionInfo; setLocation(regionLoc); } @Override - public ReplicateWALEntryResponse call(int callTimeout) throws IOException { - try { - replayToServer(this.regionInfo, this.entries); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - return null; - } - - private void replayToServer(HRegionInfo regionInfo, List<Entry> entries) - throws IOException, ServiceException { - if (entries.isEmpty()) return; + protected ReplicateWALEntryResponse call(PayloadCarryingRpcController controller) + throws Exception { + if (entries.isEmpty()) return null; Entry[] entriesArray = new Entry[entries.size()]; entriesArray = entries.toArray(entriesArray); @@ -216,12 +199,8 @@ public class WALEditsReplaySink { Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); - try { - remoteSvr.replay(controller, p.getFirst()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + controller.setCellScanner(p.getSecond()); + return remoteSvr.replay(controller, p.getFirst()); } @Override @@ -245,4 +224,4 @@ public class WALEditsReplaySink { } } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index b0fd176..c756294 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; @@ -46,27 +45,21 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionAdminServiceCallable; -import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RetryingCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; -import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers; -import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink; -import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; -import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer; -import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter; import org.apache.hadoop.hbase.replication.BaseWALEntryFilter; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; @@ -74,12 +67,17 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers; +import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink; +import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; +import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer; +import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter; import org.apache.hadoop.util.StringUtils; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Lists; -import com.google.protobuf.ServiceException; /** * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint @@ -611,9 +609,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { * Calls replay on the passed edits for the given set of entries belonging to the region. It skips * the entry if the region boundaries have changed or the region is gone. */ - static class RegionReplicaReplayCallable - extends RegionAdminServiceCallable<ReplicateWALEntryResponse> { - + static class RegionReplicaReplayCallable extends + RegionAdminServiceCallable<ReplicateWALEntryResponse> { private final List<Entry> entries; private final byte[] initialEncodedRegionName; private final AtomicLong skippedEntries; @@ -628,38 +625,25 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes(); } - @Override - public ReplicateWALEntryResponse call(int timeout) throws IOException { - return replayToServer(this.entries, timeout); - } - - private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout) - throws IOException { - // check whether we should still replay this entry. If the regions are changed, or the + public ReplicateWALEntryResponse call(PayloadCarryingRpcController controller) throws Exception { + // Check whether we should still replay this entry. If the regions are changed, or the // entry is not coming form the primary region, filter it out because we do not need it. // Regions can change because of (1) region split (2) region merge (3) table recreated boolean skip = false; - if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(), - initialEncodedRegionName)) { + initialEncodedRegionName)) { skip = true; } - if (!entries.isEmpty() && !skip) { - Entry[] entriesArray = new Entry[entries.size()]; - entriesArray = entries.toArray(entriesArray); + if (!this.entries.isEmpty() && !skip) { + Entry[] entriesArray = new Entry[this.entries.size()]; + entriesArray = this.entries.toArray(entriesArray); // set the region name for the target region replica Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location .getRegionInfo().getEncodedNameAsBytes(), null, null, null); - try { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); - controller.setCallTimeout(timeout); - controller.setPriority(tableName); - return stub.replay(controller, p.getFirst()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + controller.setCellScanner(p.getSecond()); + return stub.replay(controller, p.getFirst()); } if (skip) { http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java index d708edc..3c81cfe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java @@ -23,19 +23,18 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -80,13 +79,11 @@ public class Merge extends Configured implements Tool { // Verify HBase is down LOG.info("Verifying that HBase is not running..."); try { - HBaseAdmin.checkHBaseAvailable(getConf()); + HBaseAdmin.available(getConf()); LOG.fatal("HBase cluster must be off-line, and is not. Aborting."); return -1; } catch (ZooKeeperConnectionException zkce) { // If no zk, presume no master. - } catch (MasterNotRunningException e) { - // Expected. Ignore. } // Initialize MetaUtils and and get the root of the HBase installation http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java index d778fa9..2dca6b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java @@ -60,7 +60,6 @@ public class TestNamespace { private static ZKNamespaceManager zkNamespaceManager; private String prefix = "TestNamespace"; - @BeforeClass public static void setUp() throws Exception { TEST_UTIL = new HBaseTestingUtility(); @@ -301,7 +300,8 @@ public class TestNamespace { runWithExpectedException(new Callable<Void>() { @Override public Void call() throws Exception { - HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("non_existing_namespace", "table1")); + HTableDescriptor htd = + new HTableDescriptor(TableName.valueOf("non_existing_namespace", "table1")); htd.addFamily(new HColumnDescriptor("family1")); admin.createTable(htd); return null; @@ -387,5 +387,4 @@ public class TestNamespace { } fail("Should have thrown exception " + exceptionClass); } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java index d088fc4..1716622 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ProcedureInfo; @@ -59,7 +58,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; -import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -67,8 +65,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.protobuf.ServiceException; - /** * Class to test HBaseAdmin. @@ -643,11 +639,9 @@ public class TestAdmin2 { long start = System.currentTimeMillis(); try { - HBaseAdmin.checkHBaseAvailable(conf); + HBaseAdmin.available(conf); assertTrue(false); - } catch (MasterNotRunningException ignored) { } catch (ZooKeeperConnectionException ignored) { - } catch (ServiceException ignored) { } catch (IOException ignored) { } long end = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index 679d9c9..f49c558 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -28,13 +28,10 @@ import java.net.UnknownHostException; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.ipc.AbstractRpcClient; @@ -56,7 +53,6 @@ import com.google.protobuf.ServiceException; @Category({MediumTests.class, ClientTests.class}) public class TestClientTimeouts { - private static final Log LOG = LogFactory.getLog(TestClientTimeouts.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); protected static int SLAVES = 1; @@ -87,7 +83,6 @@ public class TestClientTimeouts { */ @Test public void testAdminTimeout() throws Exception { - Connection lastConnection = null; boolean lastFailed = false; int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get(); RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory @@ -105,7 +100,7 @@ public class TestClientTimeouts { connection = ConnectionFactory.createConnection(conf); admin = connection.getAdmin(); // run some admin commands - HBaseAdmin.checkHBaseAvailable(conf); + HBaseAdmin.available(conf); admin.setBalancerRunning(false, false); } catch (ZooKeeperConnectionException ex) { // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 1b20b76..33af5de 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.master.HMaster; @@ -103,8 +104,6 @@ public class TestHCM { TableName.valueOf("test2"); private static final TableName TABLE_NAME3 = TableName.valueOf("test3"); - private static final TableName TABLE_NAME4 = - TableName.valueOf("test4"); private static final byte[] FAM_NAM = Bytes.toBytes("f"); private static final byte[] ROW = Bytes.toBytes("bbb"); private static final byte[] ROW_X = Bytes.toBytes("xxx"); @@ -407,10 +406,11 @@ public class TestHCM { long pauseTime; long baseTime = 100; TableName tableName = TableName.valueOf("HCM-testCallableSleep"); - Table table = TEST_UTIL.createTable(tableName, FAM_NAM); RegionServerCallable<Object> regionServerCallable = new RegionServerCallable<Object>( - TEST_UTIL.getConnection(), tableName, ROW) { - public Object call(int timeout) throws IOException { + TEST_UTIL.getConnection(), new RpcControllerFactory(TEST_UTIL.getConfiguration()), + tableName, ROW) { + @Override + protected Object call(PayloadCarryingRpcController controller) throws Exception { return null; } }; @@ -424,9 +424,10 @@ public class TestHCM { RegionAdminServiceCallable<Object> regionAdminServiceCallable = new RegionAdminServiceCallable<Object>( - (ClusterConnection) TEST_UTIL.getConnection(), new RpcControllerFactory( - TEST_UTIL.getConfiguration()), tableName, ROW) { - public Object call(int timeout) throws IOException { + (ClusterConnection) TEST_UTIL.getConnection(), + new RpcControllerFactory(TEST_UTIL.getConfiguration()), tableName, ROW) { + @Override + public Object call(PayloadCarryingRpcController controller) throws Exception { return null; } }; @@ -438,16 +439,21 @@ public class TestHCM { assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f)); } - MasterCallable masterCallable = new MasterCallable(TEST_UTIL.getConnection()) { - public Object call(int timeout) throws IOException { + MasterCallable<Object> masterCallable = new MasterCallable<Object>(TEST_UTIL.getConnection(), + new RpcControllerFactory(TEST_UTIL.getConfiguration())) { + @Override + protected Object call(PayloadCarryingRpcController rpcController) throws Exception { return null; } }; - - for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) { - pauseTime = masterCallable.sleep(baseTime, i); - assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i])); - assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f)); + try { + for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) { + pauseTime = masterCallable.sleep(baseTime, i); + assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i])); + assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f)); + } + } finally { + masterCallable.close(); } } @@ -1149,7 +1155,6 @@ public class TestHCM { ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(timeMachine); try { - long timeBase = timeMachine.currentTime(); long largeAmountOfTime = ANY_PAUSE * 1000; ConnectionImplementation.ServerErrorTracker tracker = new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100); http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java index 354f0a8..d99d2ee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -332,26 +334,27 @@ public class TestReplicaWithCluster { // bulk load HFiles LOG.debug("Loading test data"); - @SuppressWarnings("deprecation") final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection(); table = conn.getTable(hdt.getTableName()); - final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn); - RegionServerCallable<Void> callable = new RegionServerCallable<Void>( - conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) { - @Override - public Void call(int timeout) throws Exception { - LOG.debug("Going to connect to server " + getLocation() + " for row " + final String bulkToken = + new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn); + RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn, + new RpcControllerFactory(HTU.getConfiguration()), hdt.getTableName(), + TestHRegionServerBulkLoad.rowkey(0)) { + @Override + protected Void call(PayloadCarryingRpcController controller) throws Exception { + LOG.debug("Going to connect to server " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); - SecureBulkLoadClient secureClient = null; - byte[] regionName = getLocation().getRegionInfo().getRegionName(); - try (Table table = conn.getTable(getTableName())) { - secureClient = new SecureBulkLoadClient(table); - secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, - true, null, bulkToken); - } - return null; + SecureBulkLoadClient secureClient = null; + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + try (Table table = conn.getTable(getTableName())) { + secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table); + secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, + true, null, bulkToken); } - }; + return null; + } + }; RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration()); RpcRetryingCaller<Void> caller = factory.newCaller(); caller.callWithRetries(callable, 10000); http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java index 6e68201..30805c0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java @@ -62,6 +62,8 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; @@ -198,19 +200,20 @@ public class TestHRegionServerBulkLoad { } // bulk load HFiles - final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection(); + final ClusterConnection conn = (ClusterConnection)UTIL.getConnection(); Table table = conn.getTable(tableName); - final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn); - RegionServerCallable<Void> callable = - new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) { + final String bulkToken = new SecureBulkLoadClient(UTIL.getConfiguration(), table). + prepareBulkLoad(conn); + RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn, + new RpcControllerFactory(UTIL.getConfiguration()), tableName, Bytes.toBytes("aaa")) { @Override - public Void call(int callTimeout) throws Exception { + public Void call(PayloadCarryingRpcController controller) throws Exception { LOG.debug("Going to connect to server " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); SecureBulkLoadClient secureClient = null; byte[] regionName = getLocation().getRegionInfo().getRegionName(); try (Table table = conn.getTable(getTableName())) { - secureClient = new SecureBulkLoadClient(table); + secureClient = new SecureBulkLoadClient(UTIL.getConfiguration(), table); secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, true, null, bulkToken); } @@ -224,15 +227,15 @@ public class TestHRegionServerBulkLoad { // Periodically do compaction to reduce the number of open file handles. if (numBulkLoads.get() % 5 == 0) { // 5 * 50 = 250 open file handles! - callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) { + callable = new RegionServerCallable<Void>(conn, + new RpcControllerFactory(UTIL.getConfiguration()), tableName, Bytes.toBytes("aaa")) { @Override - public Void call(int callTimeout) throws Exception { + protected Void call(PayloadCarryingRpcController controller) throws Exception { LOG.debug("compacting " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); AdminProtos.AdminService.BlockingInterface server = conn.getAdmin(getLocation().getServerName()); - CompactRegionRequest request = - RequestConverter.buildCompactRegionRequest( + CompactRegionRequest request = RequestConverter.buildCompactRegionRequest( getLocation().getRegionInfo().getRegionName(), true, null); server.compactRegion(null, request); numCompactions.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java index d55adef..7560a41 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java @@ -33,6 +33,8 @@ import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; @@ -89,10 +91,12 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul // bulk load HFiles final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection(); + RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration()); RegionServerCallable<Void> callable = - new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) { + new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName, + Bytes.toBytes("aaa")) { @Override - public Void call(int callTimeout) throws Exception { + protected Void call(PayloadCarryingRpcController controller) throws Exception { LOG.info("Non-secure old client"); byte[] regionName = getLocation().getRegionInfo().getRegionName(); BulkLoadHFileRequest request = @@ -109,9 +113,10 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul // Periodically do compaction to reduce the number of open file handles. if (numBulkLoads.get() % 5 == 0) { // 5 * 50 = 250 open file handles! - callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) { + callable = new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName, + Bytes.toBytes("aaa")) { @Override - public Void call(int callTimeout) throws Exception { + protected Void call(PayloadCarryingRpcController controller) throws Exception { LOG.debug("compacting " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); AdminProtos.AdminService.BlockingInterface server = http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java index 6de6261..0bc9498 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java @@ -33,13 +33,13 @@ import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; -import org.apache.hadoop.hbase.client.SecureBulkLoadClient; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; @@ -62,7 +62,8 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS super(duration); } - private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoadWithOldSecureEndpoint.class); + private static final Log LOG = + LogFactory.getLog(TestHRegionServerBulkLoadWithOldSecureEndpoint.class); @BeforeClass public static void setUpBeforeClass() throws IOException { @@ -103,16 +104,17 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection(); Table table = conn.getTable(tableName); final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName); + RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration()); RegionServerCallable<Void> callable = - new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) { + new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName, + Bytes.toBytes("aaa")) { @Override - public Void call(int callTimeout) throws Exception { - LOG.debug("Going to connect to server " + getLocation() + " for row " - + Bytes.toStringBinary(getRow())); + protected Void call(PayloadCarryingRpcController controller) throws Exception { + LOG.debug("Going to connect to server " + getLocation() + " for row " + + Bytes.toStringBinary(getRow())); try (Table table = conn.getTable(getTableName())) { - boolean loaded = - new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, null, - bulkToken, getLocation().getRegionInfo().getStartKey()); + boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, + null, bulkToken, getLocation().getRegionInfo().getStartKey()); } return null; } @@ -124,9 +126,10 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS // Periodically do compaction to reduce the number of open file handles. if (numBulkLoads.get() % 5 == 0) { // 5 * 50 = 250 open file handles! - callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) { + callable = new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName, + Bytes.toBytes("aaa")) { @Override - public Void call(int callTimeout) throws Exception { + protected Void call(PayloadCarryingRpcController controller) throws Exception { LOG.debug("compacting " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); AdminProtos.AdminService.BlockingInterface server = http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java index fa66d69..3e90fe1 100644 --- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java @@ -17,8 +17,6 @@ package org.apache.hadoop.hbase.spark; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; @@ -37,6 +35,8 @@ import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.ByteString; /** * This filter will push down all qualifier logic given to us