http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java new file mode 100644 index 0000000..cf08ea9 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public class TimeLimitedRpcController implements RpcController { + + /** + * The time, in ms before the call should expire. + */ + protected volatile Integer callTimeout; + protected volatile boolean cancelled = false; + protected final AtomicReference<RpcCallback<Object>> cancellationCb = + new AtomicReference<>(null); + + protected final AtomicReference<RpcCallback<IOException>> failureCb = + new AtomicReference<>(null); + + private IOException exception; + + public int getCallTimeout() { + if (callTimeout != null) { + return callTimeout; + } else { + return 0; + } + } + + public void setCallTimeout(int callTimeout) { + this.callTimeout = callTimeout; + } + + public boolean hasCallTimeout(){ + return callTimeout != null; + } + + @Override + public String errorText() { + if (exception != null) { + return exception.getMessage(); + } else { + return null; + } + } + + /** + * For use in async rpc clients + * @return true if failed + */ + @Override + public boolean failed() { + return this.exception != null; + } + + @Override + public boolean isCanceled() { + return cancelled; + } + + @Override + public void notifyOnCancel(RpcCallback<Object> cancellationCb) { + this.cancellationCb.set(cancellationCb); + if (this.cancelled) { + cancellationCb.run(null); + } + } + + /** + * Notify a callback on error. + * For use in async rpc clients + * + * @param failureCb the callback to call on error + */ + public void notifyOnFail(RpcCallback<IOException> failureCb) { + this.failureCb.set(failureCb); + if (this.exception != null) { + failureCb.run(this.exception); + } + } + + @Override + public void reset() { + exception = null; + cancelled = false; + failureCb.set(null); + cancellationCb.set(null); + callTimeout = null; + } + + @Override + public void setFailed(String reason) { + this.exception = new IOException(reason); + if (this.failureCb.get() != null) { + this.failureCb.get().run(this.exception); + } + } + + /** + * Set failed with an exception to pass on. + * For use in async rpc clients + * + * @param e exception to set with + */ + public void setFailed(IOException e) { + this.exception = e; + if (this.failureCb.get() != null) { + this.failureCb.get().run(this.exception); + } + } + + @Override + public void startCancel() { + cancelled = true; + if (cancellationCb.get() != null) { + cancellationCb.get().run(null); + } + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 623acd5..5ba0572 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hbase.protobuf; -import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier -.RegionSpecifierType.REGION_NAME; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -41,11 +38,14 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NavigableSet; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; + +import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier +.RegionSpecifierType.REGION_NAME; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; @@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -125,8 +124,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.LiveServerInfo; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionInTransition; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad; -import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos; import org.apache.hadoop.hbase.protobuf.generated.FSProtos.HBaseVersionFileContent; +import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos; import org.apache.hadoop.hbase.protobuf.generated.FilterProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair; @@ -172,9 +171,11 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.DynamicClassLoader; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.Methods; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import com.google.common.collect.ArrayListMultimap; @@ -333,32 +334,17 @@ public final class ProtobufUtil { * a new IOException that wraps the unexpected ServiceException. */ public static IOException getRemoteException(ServiceException se) { - return makeIOExceptionOfException(se); - } - - /** - * Like {@link #getRemoteException(ServiceException)} but more generic, able to handle more than - * just {@link ServiceException}. Prefer this method to - * {@link #getRemoteException(ServiceException)} because trying to - * contain direct protobuf references. - * @param e - */ - public static IOException handleRemoteException(Exception e) { - return makeIOExceptionOfException(e); - } - - private static IOException makeIOExceptionOfException(Exception e) { - Throwable t = e; - if (e instanceof ServiceException) { - t = e.getCause(); + Throwable e = se.getCause(); + if (e == null) { + return new IOException(se); } - if (ExceptionUtil.isInterrupt(t)) { - return ExceptionUtil.asInterrupt(t); + if (ExceptionUtil.isInterrupt(e)) { + return ExceptionUtil.asInterrupt(e); } - if (t instanceof RemoteException) { - t = ((RemoteException)t).unwrapRemoteException(); + if (e instanceof RemoteException) { + e = ((RemoteException) e).unwrapRemoteException(); } - return t instanceof IOException? (IOException)t: new HBaseIOException(t); + return e instanceof IOException ? (IOException) e : new IOException(se); } /** @@ -1266,6 +1252,7 @@ public final class ProtobufUtil { return toMutation(type, mutation, builder, HConstants.NO_NONCE); } + @SuppressWarnings("deprecation") public static MutationProto toMutation(final MutationType type, final Mutation mutation, MutationProto.Builder builder, long nonce) throws IOException { @@ -2671,11 +2658,13 @@ public final class ProtobufUtil { } } + @SuppressWarnings("deprecation") public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] family, List<Path> inputPaths, List<Path> outputPaths, Path storeDir) { return toCompactionDescriptor(info, null, family, inputPaths, outputPaths, storeDir); } + @SuppressWarnings("deprecation") public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] regionName, byte[] family, List<Path> inputPaths, List<Path> outputPaths, Path storeDir) { // compaction descriptor contains relative paths. @@ -3674,28 +3663,4 @@ public final class ProtobufUtil { return new RegionLoadStats(stats.getMemstoreLoad(), stats.getHeapOccupancy(), stats.getCompactionPressure()); } - - /** - * @param msg - * @return A String version of the passed in <code>msg</code> - */ - public static String toText(Message msg) { - return TextFormat.shortDebugString(msg); - } - - public static byte [] toBytes(ByteString bs) { - return bs.toByteArray(); - } - - /** - * Contain ServiceException inside here. Take a callable that is doing our pb rpc and run it. - * @throws IOException - */ - public static <T> T call(Callable<T> callable) throws IOException { - try { - return callable.call(); - } catch (Exception e) { - throw ProtobufUtil.handleRemoteException(e); - } - } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index fd2a393..f083001 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -65,6 +65,7 @@ public class TestClientScanner { RpcControllerFactory controllerFactory; @Before + @SuppressWarnings("deprecation") public void setup() throws IOException { clusterConn = Mockito.mock(ClusterConnection.class); rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class); http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java index edcbdc5..9c3367e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java @@ -45,5 +45,4 @@ public class HBaseIOException extends IOException { public HBaseIOException(Throwable cause) { super(cause); - } -} \ No newline at end of file + }} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java index 7e6c5d6..688b51a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java @@ -73,4 +73,4 @@ public class ExceptionUtil { private ExceptionUtil() { } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index ec28315..73226aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -75,17 +75,20 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.io.ByteBufferInputStream; -import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -96,6 +99,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.hadoop.hbase.security.HBasePolicyProvider; http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 8ddbe18..09dedec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -87,8 +87,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; @@ -104,6 +102,7 @@ import org.apache.hadoop.util.ToolRunner; /** * Tool to load the output of HFileOutputFormat into an existing table. + * @see #usage() */ @InterfaceAudience.Public @InterfaceStability.Stable @@ -131,13 +130,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private String bulkToken; private UserProvider userProvider; private int nrThreads; - private RpcControllerFactory rpcControllerFactory; private LoadIncrementalHFiles() {} public LoadIncrementalHFiles(Configuration conf) throws Exception { super(conf); - this.rpcControllerFactory = new RpcControllerFactory(conf); initialize(); } @@ -325,7 +322,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { // LQI queue does not need to be threadsafe -- all operations on this queue // happen in this thread Deque<LoadQueueItem> queue = new LinkedList<>(); - SecureBulkLoadClient secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); + SecureBulkLoadClient secureClient = new SecureBulkLoadClient(table); try { /* @@ -476,11 +473,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool { /** * Used by the replication sink to load the hfiles from the source cluster. It does the following, - * <ol> - * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li> - * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap) - * </li> - * </ol> + * 1. {@link LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)} 2. + * {@link + * LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)} * @param table Table to which these hfiles should be loaded to * @param conn Connection to use * @param queue {@link LoadQueueItem} has hfiles yet to be loaded @@ -781,23 +776,27 @@ public class LoadIncrementalHFiles extends Configured implements Tool { protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn, final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis) throws IOException { - final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size()); + final List<Pair<byte[], String>> famPaths = + new ArrayList<>(lqis.size()); for (LoadQueueItem lqi : lqis) { famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); } - final RegionServerCallable<Boolean> svrCallable = new RegionServerCallable<Boolean>(conn, - rpcControllerFactory, tableName, first) { + + final RegionServerCallable<Boolean> svrCallable = + new RegionServerCallable<Boolean>(conn, tableName, first) { @Override - protected Boolean call(PayloadCarryingRpcController controller) throws Exception { + public Boolean call(int callTimeout) throws Exception { SecureBulkLoadClient secureClient = null; boolean success = false; + try { LOG.debug("Going to connect to server " + getLocation() + " for row " + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths); byte[] regionName = getLocation().getRegionInfo().getRegionName(); try (Table table = conn.getTable(getTableName())) { - secureClient = new SecureBulkLoadClient(getConf(), table); - success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, + secureClient = new SecureBulkLoadClient(table); + success = + secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, assignSeqIds, fsDelegationToken.getUserToken(), bulkToken); } return success; @@ -1079,7 +1078,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { /** * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is - * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes + * used only when {@link SecureBulkLoadEndpoint} is configured in hbase.coprocessor.region.classes * property. This directory is used as a temporary directory where all files are initially * copied/moved from user given directory, set all the required file permissions and then from * their it is finally loaded into a table. This should be set only when, one would like to manage @@ -1089,4 +1088,5 @@ public class LoadIncrementalHFiles extends Configured implements Tool { public void setBulkToken(String stagingDir) { this.bulkToken = stagingDir; } -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java index 3261bd6..a21edcc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; +import com.google.protobuf.ServiceException; + /** * The Class ExpiredMobFileCleanerChore for running cleaner regularly to remove the expired * mob files. @@ -84,6 +86,10 @@ public class ExpiredMobFileCleanerChore extends ScheduledChore { } catch (LockTimeoutException e) { LOG.info("Fail to acquire the lock because of timeout, maybe a" + " MobCompactor is running", e); + } catch (ServiceException e) { + LOG.error( + "Fail to clean the expired mob files for the column " + hcd.getNameAsString() + + " in the table " + htd.getNameAsString(), e); } catch (IOException e) { LOG.error( "Fail to clean the expired mob files for the column " + hcd.getNameAsString() http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index d7ba4f3..531883a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo; +import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId; -import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; @@ -454,7 +454,8 @@ public class ServerManager { /** * Adds the onlineServers list. onlineServers should be locked. * @param serverName The remote servers name. - * @param s + * @param sl + * @return Server load from the removed server, if any. */ @VisibleForTesting void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) { http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java index d4a54bb..3c965cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import com.google.protobuf.ServiceException; + /** * The cleaner to delete the expired MOB files. */ @@ -58,8 +60,11 @@ public class ExpiredMobFileCleaner extends Configured implements Tool { * directory. * @param tableName The current table name. * @param family The current family. + * @throws ServiceException + * @throws IOException */ - public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family) throws IOException { + public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family) + throws ServiceException, IOException { Configuration conf = getConf(); TableName tn = TableName.valueOf(tableName); FileSystem fs = FileSystem.get(conf); @@ -94,7 +99,7 @@ public class ExpiredMobFileCleaner extends Configured implements Tool { String tableName = args[0]; String familyName = args[1]; TableName tn = TableName.valueOf(tableName); - HBaseAdmin.available(getConf()); + HBaseAdmin.checkHBaseAvailable(getConf()); Connection connection = ConnectionFactory.createConnection(getConf()); Admin admin = connection.getAdmin(); try { @@ -122,4 +127,5 @@ public class ExpiredMobFileCleaner extends Configured implements Tool { } } } -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java index c27e8ae..8547c8c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java @@ -38,6 +38,8 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.zookeeper.KeeperException; +import com.google.protobuf.ServiceException; + /** * The sweep tool. It deletes the mob files that are not used and merges the small mob files to * bigger ones. Each run of this sweep tool only handles one column family. The runs on @@ -62,10 +64,10 @@ public class Sweeper extends Configured implements Tool { * @throws ServiceException */ int sweepFamily(String tableName, String familyName) throws IOException, InterruptedException, - ClassNotFoundException, KeeperException { + ClassNotFoundException, KeeperException, ServiceException { Configuration conf = getConf(); // make sure the target HBase exists. - HBaseAdmin.available(conf); + HBaseAdmin.checkHBaseAvailable(conf); Connection connection = ConnectionFactory.createConnection(getConf()); Admin admin = connection.getAdmin(); try { http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index fb9a605..d87ada4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -87,6 +87,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController; import org.apache.hadoop.hbase.master.MasterRpcServices; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -2764,15 +2765,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; } - if (controller instanceof PayloadCarryingRpcController) { - PayloadCarryingRpcController pRpcController = - (PayloadCarryingRpcController)controller; - if (pRpcController.getCallTimeout() > 0) { - timeLimitDelta = Math.min(timeLimitDelta, pRpcController.getCallTimeout()); + if (controller instanceof TimeLimitedRpcController) { + TimeLimitedRpcController timeLimitedRpcController = + (TimeLimitedRpcController)controller; + if (timeLimitedRpcController.getCallTimeout() > 0) { + timeLimitDelta = Math.min(timeLimitDelta, + timeLimitedRpcController.getCallTimeout()); } - } else { - throw new UnsupportedOperationException("We only do " + - "PayloadCarryingRpcControllers! FIX IF A PROBLEM"); } // Use half of whichever timeout value was more restrictive... But don't allow // the time limit to be less than the allowable minimum (could cause an http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java index c71153d..3eb85bd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -42,6 +44,7 @@ import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; @@ -58,8 +61,10 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; */ @InterfaceAudience.Private public class WALEditsReplaySink { + private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class); private static final int MAX_BATCH_SIZE = 1024; + private final Configuration conf; private final ClusterConnection conn; private final TableName tableName; @@ -161,8 +166,8 @@ public class WALEditsReplaySink { try { RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null); ReplayServerCallable<ReplicateWALEntryResponse> callable = - new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.rpcControllerFactory, - this.tableName, regionLoc, entries); + new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc, + regionInfo, entries); factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout); } catch (IOException ie) { if (skipErrors) { @@ -179,19 +184,31 @@ public class WALEditsReplaySink { * @param <R> */ class ReplayServerCallable<R> extends RegionServerCallable<ReplicateWALEntryResponse> { + private HRegionInfo regionInfo; private List<Entry> entries; - ReplayServerCallable(final Connection connection, RpcControllerFactory rpcControllerFactory, - final TableName tableName, final HRegionLocation regionLoc, final List<Entry> entries) { - super(connection, rpcControllerFactory, tableName, null); + ReplayServerCallable(final Connection connection, final TableName tableName, + final HRegionLocation regionLoc, final HRegionInfo regionInfo, + final List<Entry> entries) { + super(connection, tableName, null); this.entries = entries; + this.regionInfo = regionInfo; setLocation(regionLoc); } @Override - protected ReplicateWALEntryResponse call(PayloadCarryingRpcController controller) - throws Exception { - if (entries.isEmpty()) return null; + public ReplicateWALEntryResponse call(int callTimeout) throws IOException { + try { + replayToServer(this.regionInfo, this.entries); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + return null; + } + + private void replayToServer(HRegionInfo regionInfo, List<Entry> entries) + throws IOException, ServiceException { + if (entries.isEmpty()) return; Entry[] entriesArray = new Entry[entries.size()]; entriesArray = entries.toArray(entriesArray); @@ -199,8 +216,12 @@ public class WALEditsReplaySink { Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray); - controller.setCellScanner(p.getSecond()); - return remoteSvr.replay(controller, p.getFirst()); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); + try { + remoteSvr.replay(controller, p.getFirst()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } @Override @@ -224,4 +245,4 @@ public class WALEditsReplaySink { } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index c756294..b0fd176 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; @@ -45,21 +46,27 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.RegionLocations; -import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionAdminServiceCallable; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RetryingCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers; +import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink; +import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; +import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer; +import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter; import org.apache.hadoop.hbase.replication.BaseWALEntryFilter; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; @@ -67,17 +74,12 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers; -import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink; -import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; -import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer; -import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter; import org.apache.hadoop.util.StringUtils; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Lists; +import com.google.protobuf.ServiceException; /** * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint @@ -609,8 +611,9 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { * Calls replay on the passed edits for the given set of entries belonging to the region. It skips * the entry if the region boundaries have changed or the region is gone. */ - static class RegionReplicaReplayCallable extends - RegionAdminServiceCallable<ReplicateWALEntryResponse> { + static class RegionReplicaReplayCallable + extends RegionAdminServiceCallable<ReplicateWALEntryResponse> { + private final List<Entry> entries; private final byte[] initialEncodedRegionName; private final AtomicLong skippedEntries; @@ -625,25 +628,38 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes(); } - public ReplicateWALEntryResponse call(PayloadCarryingRpcController controller) throws Exception { - // Check whether we should still replay this entry. If the regions are changed, or the + @Override + public ReplicateWALEntryResponse call(int timeout) throws IOException { + return replayToServer(this.entries, timeout); + } + + private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout) + throws IOException { + // check whether we should still replay this entry. If the regions are changed, or the // entry is not coming form the primary region, filter it out because we do not need it. // Regions can change because of (1) region split (2) region merge (3) table recreated boolean skip = false; + if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(), - initialEncodedRegionName)) { + initialEncodedRegionName)) { skip = true; } - if (!this.entries.isEmpty() && !skip) { - Entry[] entriesArray = new Entry[this.entries.size()]; - entriesArray = this.entries.toArray(entriesArray); + if (!entries.isEmpty() && !skip) { + Entry[] entriesArray = new Entry[entries.size()]; + entriesArray = entries.toArray(entriesArray); // set the region name for the target region replica Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location .getRegionInfo().getEncodedNameAsBytes(), null, null, null); - controller.setCellScanner(p.getSecond()); - return stub.replay(controller, p.getFirst()); + try { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); + controller.setCallTimeout(timeout); + controller.setPriority(tableName); + return stub.replay(controller, p.getFirst()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } if (skip) { http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java index 3c81cfe..d708edc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java @@ -23,18 +23,19 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -79,11 +80,13 @@ public class Merge extends Configured implements Tool { // Verify HBase is down LOG.info("Verifying that HBase is not running..."); try { - HBaseAdmin.available(getConf()); + HBaseAdmin.checkHBaseAvailable(getConf()); LOG.fatal("HBase cluster must be off-line, and is not. Aborting."); return -1; } catch (ZooKeeperConnectionException zkce) { // If no zk, presume no master. + } catch (MasterNotRunningException e) { + // Expected. Ignore. } // Initialize MetaUtils and and get the root of the HBase installation http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java index 2dca6b1..d778fa9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java @@ -60,6 +60,7 @@ public class TestNamespace { private static ZKNamespaceManager zkNamespaceManager; private String prefix = "TestNamespace"; + @BeforeClass public static void setUp() throws Exception { TEST_UTIL = new HBaseTestingUtility(); @@ -300,8 +301,7 @@ public class TestNamespace { runWithExpectedException(new Callable<Void>() { @Override public Void call() throws Exception { - HTableDescriptor htd = - new HTableDescriptor(TableName.valueOf("non_existing_namespace", "table1")); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("non_existing_namespace", "table1")); htd.addFamily(new HColumnDescriptor("family1")); admin.createTable(htd); return null; @@ -387,4 +387,5 @@ public class TestNamespace { } fail("Should have thrown exception " + exceptionClass); } -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java index 1716622..d088fc4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ProcedureInfo; @@ -58,6 +59,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -65,6 +67,8 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.google.protobuf.ServiceException; + /** * Class to test HBaseAdmin. @@ -639,9 +643,11 @@ public class TestAdmin2 { long start = System.currentTimeMillis(); try { - HBaseAdmin.available(conf); + HBaseAdmin.checkHBaseAvailable(conf); assertTrue(false); + } catch (MasterNotRunningException ignored) { } catch (ZooKeeperConnectionException ignored) { + } catch (ServiceException ignored) { } catch (IOException ignored) { } long end = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index f49c558..679d9c9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -28,10 +28,13 @@ import java.net.UnknownHostException; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.ipc.AbstractRpcClient; @@ -53,6 +56,7 @@ import com.google.protobuf.ServiceException; @Category({MediumTests.class, ClientTests.class}) public class TestClientTimeouts { + private static final Log LOG = LogFactory.getLog(TestClientTimeouts.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); protected static int SLAVES = 1; @@ -83,6 +87,7 @@ public class TestClientTimeouts { */ @Test public void testAdminTimeout() throws Exception { + Connection lastConnection = null; boolean lastFailed = false; int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get(); RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory @@ -100,7 +105,7 @@ public class TestClientTimeouts { connection = ConnectionFactory.createConnection(conf); admin = connection.getAdmin(); // run some admin commands - HBaseAdmin.available(conf); + HBaseAdmin.checkHBaseAvailable(conf); admin.setBalancerRunning(false, false); } catch (ZooKeeperConnectionException ex) { // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 33af5de..1b20b76 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.master.HMaster; @@ -104,6 +103,8 @@ public class TestHCM { TableName.valueOf("test2"); private static final TableName TABLE_NAME3 = TableName.valueOf("test3"); + private static final TableName TABLE_NAME4 = + TableName.valueOf("test4"); private static final byte[] FAM_NAM = Bytes.toBytes("f"); private static final byte[] ROW = Bytes.toBytes("bbb"); private static final byte[] ROW_X = Bytes.toBytes("xxx"); @@ -406,11 +407,10 @@ public class TestHCM { long pauseTime; long baseTime = 100; TableName tableName = TableName.valueOf("HCM-testCallableSleep"); + Table table = TEST_UTIL.createTable(tableName, FAM_NAM); RegionServerCallable<Object> regionServerCallable = new RegionServerCallable<Object>( - TEST_UTIL.getConnection(), new RpcControllerFactory(TEST_UTIL.getConfiguration()), - tableName, ROW) { - @Override - protected Object call(PayloadCarryingRpcController controller) throws Exception { + TEST_UTIL.getConnection(), tableName, ROW) { + public Object call(int timeout) throws IOException { return null; } }; @@ -424,10 +424,9 @@ public class TestHCM { RegionAdminServiceCallable<Object> regionAdminServiceCallable = new RegionAdminServiceCallable<Object>( - (ClusterConnection) TEST_UTIL.getConnection(), - new RpcControllerFactory(TEST_UTIL.getConfiguration()), tableName, ROW) { - @Override - public Object call(PayloadCarryingRpcController controller) throws Exception { + (ClusterConnection) TEST_UTIL.getConnection(), new RpcControllerFactory( + TEST_UTIL.getConfiguration()), tableName, ROW) { + public Object call(int timeout) throws IOException { return null; } }; @@ -439,21 +438,16 @@ public class TestHCM { assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f)); } - MasterCallable<Object> masterCallable = new MasterCallable<Object>(TEST_UTIL.getConnection(), - new RpcControllerFactory(TEST_UTIL.getConfiguration())) { - @Override - protected Object call(PayloadCarryingRpcController rpcController) throws Exception { + MasterCallable masterCallable = new MasterCallable(TEST_UTIL.getConnection()) { + public Object call(int timeout) throws IOException { return null; } }; - try { - for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) { - pauseTime = masterCallable.sleep(baseTime, i); - assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i])); - assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f)); - } - } finally { - masterCallable.close(); + + for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) { + pauseTime = masterCallable.sleep(baseTime, i); + assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i])); + assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f)); } } @@ -1155,6 +1149,7 @@ public class TestHCM { ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(timeMachine); try { + long timeBase = timeMachine.currentTime(); long largeAmountOfTime = ANY_PAUSE * 1000; ConnectionImplementation.ServerErrorTracker tracker = new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100); http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java index d99d2ee..354f0a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java @@ -35,8 +35,6 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -334,27 +332,26 @@ public class TestReplicaWithCluster { // bulk load HFiles LOG.debug("Loading test data"); + @SuppressWarnings("deprecation") final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection(); table = conn.getTable(hdt.getTableName()); - final String bulkToken = - new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn); - RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn, - new RpcControllerFactory(HTU.getConfiguration()), hdt.getTableName(), - TestHRegionServerBulkLoad.rowkey(0)) { - @Override - protected Void call(PayloadCarryingRpcController controller) throws Exception { - LOG.debug("Going to connect to server " + getLocation() + " for row " + final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn); + RegionServerCallable<Void> callable = new RegionServerCallable<Void>( + conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) { + @Override + public Void call(int timeout) throws Exception { + LOG.debug("Going to connect to server " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); - SecureBulkLoadClient secureClient = null; - byte[] regionName = getLocation().getRegionInfo().getRegionName(); - try (Table table = conn.getTable(getTableName())) { - secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table); - secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, - true, null, bulkToken); + SecureBulkLoadClient secureClient = null; + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + try (Table table = conn.getTable(getTableName())) { + secureClient = new SecureBulkLoadClient(table); + secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, + true, null, bulkToken); + } + return null; } - return null; - } - }; + }; RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration()); RpcRetryingCaller<Void> caller = factory.newCaller(); caller.callWithRetries(callable, 10000); http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java index 30805c0..6e68201 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java @@ -62,8 +62,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; @@ -200,20 +198,19 @@ public class TestHRegionServerBulkLoad { } // bulk load HFiles - final ClusterConnection conn = (ClusterConnection)UTIL.getConnection(); + final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection(); Table table = conn.getTable(tableName); - final String bulkToken = new SecureBulkLoadClient(UTIL.getConfiguration(), table). - prepareBulkLoad(conn); - RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn, - new RpcControllerFactory(UTIL.getConfiguration()), tableName, Bytes.toBytes("aaa")) { + final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn); + RegionServerCallable<Void> callable = + new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) { @Override - public Void call(PayloadCarryingRpcController controller) throws Exception { + public Void call(int callTimeout) throws Exception { LOG.debug("Going to connect to server " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); SecureBulkLoadClient secureClient = null; byte[] regionName = getLocation().getRegionInfo().getRegionName(); try (Table table = conn.getTable(getTableName())) { - secureClient = new SecureBulkLoadClient(UTIL.getConfiguration(), table); + secureClient = new SecureBulkLoadClient(table); secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, true, null, bulkToken); } @@ -227,15 +224,15 @@ public class TestHRegionServerBulkLoad { // Periodically do compaction to reduce the number of open file handles. if (numBulkLoads.get() % 5 == 0) { // 5 * 50 = 250 open file handles! - callable = new RegionServerCallable<Void>(conn, - new RpcControllerFactory(UTIL.getConfiguration()), tableName, Bytes.toBytes("aaa")) { + callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) { @Override - protected Void call(PayloadCarryingRpcController controller) throws Exception { + public Void call(int callTimeout) throws Exception { LOG.debug("compacting " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); AdminProtos.AdminService.BlockingInterface server = conn.getAdmin(getLocation().getServerName()); - CompactRegionRequest request = RequestConverter.buildCompactRegionRequest( + CompactRegionRequest request = + RequestConverter.buildCompactRegionRequest( getLocation().getRegionInfo().getRegionName(), true, null); server.compactRegion(null, request); numCompactions.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java index 7560a41..d55adef 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java @@ -33,8 +33,6 @@ import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; @@ -91,12 +89,10 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul // bulk load HFiles final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection(); - RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration()); RegionServerCallable<Void> callable = - new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName, - Bytes.toBytes("aaa")) { + new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) { @Override - protected Void call(PayloadCarryingRpcController controller) throws Exception { + public Void call(int callTimeout) throws Exception { LOG.info("Non-secure old client"); byte[] regionName = getLocation().getRegionInfo().getRegionName(); BulkLoadHFileRequest request = @@ -113,10 +109,9 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul // Periodically do compaction to reduce the number of open file handles. if (numBulkLoads.get() % 5 == 0) { // 5 * 50 = 250 open file handles! - callable = new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName, - Bytes.toBytes("aaa")) { + callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) { @Override - protected Void call(PayloadCarryingRpcController controller) throws Exception { + public Void call(int callTimeout) throws Exception { LOG.debug("compacting " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); AdminProtos.AdminService.BlockingInterface server = http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java index 0bc9498..6de6261 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java @@ -33,13 +33,13 @@ import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.client.SecureBulkLoadClient; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; @@ -62,8 +62,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS super(duration); } - private static final Log LOG = - LogFactory.getLog(TestHRegionServerBulkLoadWithOldSecureEndpoint.class); + private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoadWithOldSecureEndpoint.class); @BeforeClass public static void setUpBeforeClass() throws IOException { @@ -104,17 +103,16 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection(); Table table = conn.getTable(tableName); final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName); - RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration()); RegionServerCallable<Void> callable = - new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName, - Bytes.toBytes("aaa")) { + new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) { @Override - protected Void call(PayloadCarryingRpcController controller) throws Exception { - LOG.debug("Going to connect to server " + getLocation() + " for row " + - Bytes.toStringBinary(getRow())); + public Void call(int callTimeout) throws Exception { + LOG.debug("Going to connect to server " + getLocation() + " for row " + + Bytes.toStringBinary(getRow())); try (Table table = conn.getTable(getTableName())) { - boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, - null, bulkToken, getLocation().getRegionInfo().getStartKey()); + boolean loaded = + new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, null, + bulkToken, getLocation().getRegionInfo().getStartKey()); } return null; } @@ -126,10 +124,9 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS // Periodically do compaction to reduce the number of open file handles. if (numBulkLoads.get() % 5 == 0) { // 5 * 50 = 250 open file handles! - callable = new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName, - Bytes.toBytes("aaa")) { + callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) { @Override - protected Void call(PayloadCarryingRpcController controller) throws Exception { + public Void call(int callTimeout) throws Exception { LOG.debug("compacting " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); AdminProtos.AdminService.BlockingInterface server = http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java index 3e90fe1..fa66d69 100644 --- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java +++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java @@ -17,6 +17,8 @@ package org.apache.hadoop.hbase.spark; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; @@ -35,8 +37,6 @@ import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.ByteString; /** * This filter will push down all qualifier logic given to us