Repository: hbase Updated Branches: refs/heads/master e4b6b4afb -> db66e6cc9
http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index d705d7c..28db7e8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.client; import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import com.google.protobuf.RpcChannel; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -32,11 +32,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; import org.apache.hadoop.hbase.filter.BinaryComparator; @@ -45,9 +45,12 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; @@ -63,7 +66,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType /** * The implementation of RawAsyncTable. - * <p> + * <p/> * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will * be finished inside the rpc framework thread, which means that the callbacks registered to the * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use @@ -74,6 +77,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType @InterfaceAudience.Private class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { + private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class); + private final AsyncConnectionImpl conn; private final TableName tableName; @@ -204,58 +209,126 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) { return conn.callerFactory.<T> single().table(tableName).row(row) - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) - .startLogErrorsCnt(startLogErrorsCnt); + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .startLogErrorsCnt(startLogErrorsCnt); } private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) { return newCaller(row.getRow(), rpcTimeoutNs); } + private CompletableFuture<Result> get(Get get, int replicaId, long timeoutNs) { + return this.<Result> newCaller(get, timeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl + .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get, + RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done), + (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) + .replicaId(replicaId).call(); + } + + // Connect the two futures, if the src future is done, then mark the dst future as done. And if + // the dst future is done, then cancel the src future. This is used for timeline consistent read. + private <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture) { + addListener(srcFuture, (r, e) -> { + if (e != null) { + dstFuture.completeExceptionally(e); + } else { + dstFuture.complete(r); + } + }); + // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture. + // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst + // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in + // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the + // tie. + addListener(dstFuture, (r, e) -> srcFuture.cancel(false)); + } + + private void timelineConsistentGet(Get get, RegionLocations locs, + CompletableFuture<Result> future) { + if (future.isDone()) { + // do not send requests to secondary replicas if the future is done, i.e, the primary request + // has already been finished. + return; + } + for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) { + CompletableFuture<Result> secondaryFuture = get(get, replicaId, readRpcTimeoutNs); + connect(secondaryFuture, future); + } + } + @Override public CompletableFuture<Result> get(Get get) { - return this.<Result> newCaller(get, readRpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl - .<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get, - RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done), - (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) - .call(); + CompletableFuture<Result> primaryFuture = + get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs); + if (get.getConsistency() == Consistency.STRONG) { + return primaryFuture; + } + // Timeline consistent read, where we will send requests to other region replicas + CompletableFuture<Result> future = new CompletableFuture<>(); + connect(primaryFuture, future); + long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs(); + long startNs = System.nanoTime(); + addListener(conn.getLocator().getRegionLocations(tableName, get.getRow(), + RegionLocateType.CURRENT, false, readRpcTimeoutNs), (locs, error) -> { + if (error != null) { + LOG.warn( + "Failed to locate all the replicas for table={}, row='{}'," + + " give up timeline consistent read", + tableName, Bytes.toStringBinary(get.getRow()), error); + return; + } + if (locs.size() <= 1) { + LOG.warn( + "There are no secondary replicas for region {}," + " give up timeline consistent read", + locs.getDefaultRegionLocation().getRegion()); + return; + } + long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs); + if (delayNs <= 0) { + timelineConsistentGet(get, locs, future); + } else { + AsyncConnectionImpl.RETRY_TIMER.newTimeout( + timeout -> timelineConsistentGet(get, locs, future), delayNs, TimeUnit.NANOSECONDS); + } + }); + return future; } @Override public CompletableFuture<Void> put(Put put) { return this.<Void> newCaller(put, writeRpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub, - put, RequestConverter::buildMutateRequest)) - .call(); + .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub, + put, RequestConverter::buildMutateRequest)) + .call(); } @Override public CompletableFuture<Void> delete(Delete delete) { return this.<Void> newCaller(delete, writeRpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc, - stub, delete, RequestConverter::buildMutateRequest)) - .call(); + .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc, + stub, delete, RequestConverter::buildMutateRequest)) + .call(); } @Override public CompletableFuture<Result> append(Append append) { checkHasFamilies(append); return this.<Result> newCaller(append, rpcTimeoutNs) - .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub, - append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) - .call(); + .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub, + append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) + .call(); } @Override public CompletableFuture<Result> increment(Increment increment) { checkHasFamilies(increment); return this.<Result> newCaller(increment, rpcTimeoutNs) - .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc, - stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) - .call(); + .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc, + stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) + .call(); } private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { @@ -313,36 +386,36 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { public CompletableFuture<Boolean> thenPut(Put put) { preCheck(); return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, - loc, stub, put, - (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p), - (c, r) -> r.getProcessed())) - .call(); + .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc, + stub, put, + (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, + new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p), + (c, r) -> r.getProcessed())) + .call(); } @Override public CompletableFuture<Boolean> thenDelete(Delete delete) { preCheck(); return RawAsyncTableImpl.this.<Boolean> newCaller(row, rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller, - loc, stub, delete, - (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d), - (c, r) -> r.getProcessed())) - .call(); + .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller, + loc, stub, delete, + (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, + new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d), + (c, r) -> r.getProcessed())) + .call(); } @Override public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { preCheck(); return RawAsyncTableImpl.this.<Boolean> newCaller(mutation, rpcTimeoutNs) - .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc, - stub, mutation, - (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm), - resp -> resp.getExists())) - .call(); + .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc, + stub, mutation, + (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, + new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm), + resp -> resp.getExists())) + .call(); } } @@ -375,10 +448,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { if (ex != null) { future.completeExceptionally(ex instanceof IOException ? ex : new IOException( - "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex)); + "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex)); } else { future.complete(respConverter - .apply((Result) multiResp.getResults().get(regionName).result.get(0))); + .apply((Result) multiResp.getResults().get(regionName).result.get(0))); } } catch (IOException e) { future.completeExceptionally(e); @@ -399,7 +472,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm); regionMutationBuilder.setAtomic(true); return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); - }, resp -> null)).call(); + }, resp -> null)) + .call(); } private Scan setDefaultScanConfig(Scan scan) { @@ -416,7 +490,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { public void scan(Scan scan, AdvancedScanResultConsumer consumer) { new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs, - maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start(); + maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start(); } private long resultSize2CacheSize(long maxResultSize) { @@ -427,8 +501,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { @Override public ResultScanner getScanner(Scan scan) { return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan), - resultSize2CacheSize( - scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize)); + resultSize2CacheSize( + scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize)); } @Override @@ -477,14 +551,14 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) { return this.<Object> batch(actions, writeRpcTimeoutNs).stream() - .map(f -> f.<Void> thenApply(r -> null)).collect(toList()); + .map(f -> f.<Void> thenApply(r -> null)).collect(toList()); } private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) { return conn.callerFactory.batch().table(tableName).actions(actions) - .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call(); + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call(); } @Override @@ -515,7 +589,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, RegionInfo region, byte[] row) { RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, - region, row, rpcTimeoutNs, operationTimeoutNs); + region, row, rpcTimeoutNs, operationTimeoutNs); S stub = stubMaker.apply(channel); CompletableFuture<R> future = new CompletableFuture<>(); ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); @@ -553,10 +627,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { } private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker, - ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, - List<HRegionLocation> locs, byte[] endKey, boolean endKeyInclusive, - AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, HRegionLocation loc, - Throwable error) { + ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs, + byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, + AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { if (error != null) { callback.onError(error); return; @@ -566,11 +639,11 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { if (locateFinished(region, endKey, endKeyInclusive)) { locateFinished.set(true); } else { - conn.getLocator() - .getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT, - operationTimeoutNs) - .whenComplete((l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, - endKeyInclusive, locateFinished, unfinishedRequest, l, e)); + addListener( + conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT, + operationTimeoutNs), + (l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive, + locateFinished, unfinishedRequest, l, e)); } coprocessorService(stubMaker, callable, region, region.getStartKey()).whenComplete((r, e) -> { if (e != null) { @@ -630,11 +703,10 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { @Override public void execute() { - conn.getLocator().getRegionLocation(tableName, startKey, - startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs) - .whenComplete( - (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), - endKey, endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error)); + addListener(conn.getLocator().getRegionLocation(tableName, startKey, + startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs), + (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey, + endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error)); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java new file mode 100644 index 0000000..067e66b --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class for processing futures. + */ +@InterfaceAudience.Private +public final class FutureUtils { + + private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class); + + private FutureUtils() { + } + + /** + * This is method is used when you just want to add a listener to the given future. We will call + * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the + * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may + * suppress exceptions thrown from the code that completes the future, and this method will catch + * all the exception thrown from the {@code action} to catch possible code bugs. + * <p/> + * And the error phone check will always report FutureReturnValueIgnored because every method in + * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always + * have one future that has not been checked. So we introduce this method and add a suppress + * warnings annotation here. + */ + @SuppressWarnings("FutureReturnValueIgnored") + public static <T> void addListener(CompletableFuture<T> future, + BiConsumer<? super T, ? super Throwable> action) { + future.whenComplete((resp, error) -> { + try { + action.accept(resp, error); + } catch (Throwable t) { + LOG.error("Unexpected error caught when processing CompletableFuture", t); + } + }); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java new file mode 100644 index 0000000..c14f69f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.util.Optional; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.util.Bytes; + +final class RegionReplicaTestHelper { + + private RegionReplicaTestHelper() { + } + + // waits for all replicas to have region location + static void waitUntilAllMetaReplicasHavingRegionLocation(AsyncRegistry registry, + int regionReplication) throws IOException { + TestZKAsyncRegistry.TEST_UTIL.waitFor( + TestZKAsyncRegistry.TEST_UTIL.getConfiguration() + .getLong("hbase.client.sync.wait.timeout.msec", 60000), + 200, true, new ExplainingPredicate<IOException>() { + @Override + public String explainFailure() throws IOException { + return "Not all meta replicas get assigned"; + } + + @Override + public boolean evaluate() throws IOException { + try { + RegionLocations locs = registry.getMetaRegionLocation().get(); + if (locs.size() < regionReplication) { + return false; + } + for (int i = 0; i < regionReplication; i++) { + if (locs.getRegionLocation(i) == null) { + return false; + } + } + return true; + } catch (Exception e) { + TestZKAsyncRegistry.LOG.warn("Failed to get meta region locations", e); + return false; + } + } + }); + } + + static Optional<ServerName> getRSCarryingReplica(HBaseTestingUtility util, TableName tableName, + int replicaId) { + return util.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) + .filter(rs -> rs.getRegions(tableName).stream() + .anyMatch(r -> r.getRegionInfo().getReplicaId() == replicaId)) + .findAny().map(rs -> rs.getServerName()); + } + + /** + * Return the new location. + */ + static ServerName moveRegion(HBaseTestingUtility util, HRegionLocation currentLoc) + throws Exception { + ServerName serverName = currentLoc.getServerName(); + RegionInfo regionInfo = currentLoc.getRegion(); + TableName tableName = regionInfo.getTable(); + int replicaId = regionInfo.getReplicaId(); + ServerName newServerName = util.getHBaseCluster().getRegionServerThreads().stream() + .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny() + .get(); + util.getAdmin().move(regionInfo.getEncodedNameAsBytes(), + Bytes.toBytes(newServerName.getServerName())); + util.waitFor(30000, new ExplainingPredicate<Exception>() { + + @Override + public boolean evaluate() throws Exception { + Optional<ServerName> newServerName = getRSCarryingReplica(util, tableName, replicaId); + return newServerName.isPresent() && !newServerName.get().equals(serverName); + } + + @Override + public String explainFailure() throws Exception { + return regionInfo.getRegionNameAsString() + " is still on " + serverName; + } + }); + return newServerName; + } + + interface Locator { + RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload) + throws Exception; + + void updateCachedLocationOnError(HRegionLocation loc, Throwable error) throws Exception; + } + + static void testLocator(HBaseTestingUtility util, TableName tableName, Locator locator) + throws Exception { + RegionLocations locs = + locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false); + assertEquals(3, locs.size()); + for (int i = 0; i < 3; i++) { + HRegionLocation loc = locs.getRegionLocation(i); + assertNotNull(loc); + ServerName serverName = getRSCarryingReplica(util, tableName, i).get(); + assertEquals(serverName, loc.getServerName()); + } + ServerName newServerName = moveRegion(util, locs.getDefaultRegionLocation()); + // The cached location should not be changed + assertEquals(locs.getDefaultRegionLocation().getServerName(), + locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false) + .getDefaultRegionLocation().getServerName()); + // should get the new location when reload = true + assertEquals(newServerName, + locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, true) + .getDefaultRegionLocation().getServerName()); + // the cached location should be replaced + assertEquals(newServerName, + locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false) + .getDefaultRegionLocation().getServerName()); + + ServerName newServerName1 = moveRegion(util, locs.getRegionLocation(1)); + ServerName newServerName2 = moveRegion(util, locs.getRegionLocation(2)); + + // The cached location should not be change + assertEquals(locs.getRegionLocation(1).getServerName(), + locator.getRegionLocations(tableName, 1, false).getRegionLocation(1).getServerName()); + // clear the cached location for replica 1 + locator.updateCachedLocationOnError(locs.getRegionLocation(1), new NotServingRegionException()); + // the cached location for replica 2 should not be changed + assertEquals(locs.getRegionLocation(2).getServerName(), + locator.getRegionLocations(tableName, 2, false).getRegionLocation(2).getServerName()); + // should get the new location as we have cleared the old location + assertEquals(newServerName1, + locator.getRegionLocations(tableName, 1, false).getRegionLocation(1).getServerName()); + // as we will get the new location for replica 2 at once, we should also get the new location + // for replica 2 + assertEquals(newServerName2, + locator.getRegionLocations(tableName, 2, false).getRegionLocation(2).getServerName()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java index 7c08d6d..df1fe08 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java @@ -17,20 +17,19 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertEquals; +import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator; -import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -42,7 +41,7 @@ public class TestAsyncMetaRegionLocator { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncMetaRegionLocator.class); + HBaseClassTestRule.forClass(TestAsyncMetaRegionLocator.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -53,10 +52,11 @@ public class TestAsyncMetaRegionLocator { @BeforeClass public static void setUp() throws Exception { TEST_UTIL.getConfiguration().set(BaseLoadBalancer.TABLES_ON_MASTER, "none"); + TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3); TEST_UTIL.startMiniCluster(3); - TEST_UTIL.waitUntilAllSystemRegionsAssigned(); - TEST_UTIL.getAdmin().setBalancerRunning(false, true); REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); + RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3); + TEST_UTIL.getAdmin().balancerSwitch(false, true); LOCATOR = new AsyncMetaRegionLocator(REGISTRY); } @@ -66,42 +66,21 @@ public class TestAsyncMetaRegionLocator { TEST_UTIL.shutdownMiniCluster(); } - private Optional<ServerName> getRSCarryingMeta() { - return TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() - .map(t -> t.getRegionServer()) - .filter(rs -> !rs.getRegions(TableName.META_TABLE_NAME).isEmpty()).findAny() - .map(rs -> rs.getServerName()); - } - @Test - public void testReload() throws Exception { - ServerName serverName = getRSCarryingMeta().get(); - assertEquals(serverName, LOCATOR.getRegionLocation(false).get().getServerName()); - - ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() - .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)) - .findAny().get(); - TEST_UTIL.getAdmin().move(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), - Bytes.toBytes(newServerName.getServerName())); - TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { + public void test() throws Exception { + testLocator(TEST_UTIL, TableName.META_TABLE_NAME, new Locator() { @Override - public boolean evaluate() throws Exception { - Optional<ServerName> newServerName = getRSCarryingMeta(); - return newServerName.isPresent() && !newServerName.get().equals(serverName); + public void updateCachedLocationOnError(HRegionLocation loc, Throwable error) + throws Exception { + LOCATOR.updateCachedLocationOnError(loc, error); } @Override - public String explainFailure() throws Exception { - return HRegionInfo.FIRST_META_REGIONINFO.getRegionNameAsString() + " is still on " + - serverName; + public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload) + throws Exception { + return LOCATOR.getRegionLocations(replicaId, reload).get(); } }); - // The cached location will not change - assertEquals(serverName, LOCATOR.getRegionLocation(false).get().getServerName()); - // should get the new location when reload = true - assertEquals(newServerName, LOCATOR.getRegionLocation(true).get().getServerName()); - // the cached location should be replaced - assertEquals(newServerName, LOCATOR.getRegionLocation(false).get().getServerName()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java index 38dc78d..eeaf99f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; +import static org.apache.hadoop.hbase.client.RegionReplicaTestHelper.testLocator; import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -38,10 +39,12 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -58,7 +61,7 @@ public class TestAsyncNonMetaRegionLocator { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class); + HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -78,7 +81,7 @@ public class TestAsyncNonMetaRegionLocator { TEST_UTIL.getAdmin().balancerSwitch(false, true); AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, - registry.getClusterId().get(), User.getCurrent()); + registry.getClusterId().get(), User.getCurrent()); LOCATOR = new AsyncNonMetaRegionLocator(CONN); SPLIT_KEYS = new byte[8][]; for (int i = 111; i < 999; i += 111) { @@ -109,11 +112,18 @@ public class TestAsyncNonMetaRegionLocator { TEST_UTIL.waitTableAvailable(TABLE_NAME); } + private CompletableFuture<HRegionLocation> getDefaultRegionLocation(TableName tableName, + byte[] row, RegionLocateType locateType, boolean reload) { + return LOCATOR + .getRegionLocations(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID, locateType, reload) + .thenApply(RegionLocations::getDefaultRegionLocation); + } + @Test public void testNoTable() throws InterruptedException { for (RegionLocateType locateType : RegionLocateType.values()) { try { - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get(); + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get(); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); } @@ -126,7 +136,7 @@ public class TestAsyncNonMetaRegionLocator { TEST_UTIL.getAdmin().disableTable(TABLE_NAME); for (RegionLocateType locateType : RegionLocateType.values()) { try { - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get(); + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get(); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(TableNotFoundException.class)); } @@ -148,13 +158,13 @@ public class TestAsyncNonMetaRegionLocator { ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName(); for (RegionLocateType locateType : RegionLocateType.values()) { assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get()); + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get()); } byte[] randKey = new byte[ThreadLocalRandom.current().nextInt(128)]; ThreadLocalRandom.current().nextBytes(randKey); for (RegionLocateType locateType : RegionLocateType.values()) { assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, - LOCATOR.getRegionLocation(TABLE_NAME, randKey, locateType, false).get()); + getDefaultRegionLocation(TABLE_NAME, randKey, locateType, false).get()); } } @@ -179,12 +189,12 @@ public class TestAsyncNonMetaRegionLocator { private ServerName[] getLocations(byte[][] startKeys) { ServerName[] serverNames = new ServerName[startKeys.length]; TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) - .forEach(rs -> { - rs.getRegions(TABLE_NAME).forEach(r -> { - serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(), - Bytes::compareTo)] = rs.getServerName(); - }); + .forEach(rs -> { + rs.getRegions(TABLE_NAME).forEach(r -> { + serverNames[Arrays.binarySearch(startKeys, r.getRegionInfo().getStartKey(), + Bytes::compareTo)] = rs.getServerName(); }); + }); return serverNames; } @@ -196,8 +206,9 @@ public class TestAsyncNonMetaRegionLocator { IntStream.range(0, 2).forEach(n -> IntStream.range(0, startKeys.length).forEach(i -> { try { assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1], - serverNames[i], LOCATOR - .getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT, false).get()); + serverNames[i], + getDefaultRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.CURRENT, false) + .get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -208,7 +219,7 @@ public class TestAsyncNonMetaRegionLocator { try { assertLocEquals(startKeys[i], i == startKeys.length - 1 ? EMPTY_END_ROW : startKeys[i + 1], serverNames[i], - LOCATOR.getRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER, false).get()); + getDefaultRegionLocation(TABLE_NAME, startKeys[i], RegionLocateType.AFTER, false).get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -220,8 +231,7 @@ public class TestAsyncNonMetaRegionLocator { n -> IntStream.range(0, endKeys.length).map(i -> endKeys.length - 1 - i).forEach(i -> { try { assertLocEquals(i == 0 ? EMPTY_START_ROW : endKeys[i - 1], endKeys[i], serverNames[i], - LOCATOR.getRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE, false) - .get()); + getDefaultRegionLocation(TABLE_NAME, endKeys[i], RegionLocateType.BEFORE, false).get()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -232,29 +242,29 @@ public class TestAsyncNonMetaRegionLocator { public void testRegionMove() throws IOException, InterruptedException, ExecutionException { createSingleRegionTable(); ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName(); - HRegionLocation loc = LOCATOR - .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get(); + HRegionLocation loc = + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get(); assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, loc); ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() - .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)) - .findAny().get(); + .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny() + .get(); TEST_UTIL.getAdmin().move(Bytes.toBytes(loc.getRegion().getEncodedName()), Bytes.toBytes(newServerName.getServerName())); while (!TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName() - .equals(newServerName)) { + .equals(newServerName)) { Thread.sleep(100); } // Should be same as it is in cache - assertSame(loc, LOCATOR - .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get()); - LOCATOR.updateCachedLocation(loc, null); + assertSame(loc, + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get()); + LOCATOR.updateCachedLocationOnError(loc, null); // null error will not trigger a cache cleanup - assertSame(loc, LOCATOR - .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get()); - LOCATOR.updateCachedLocation(loc, new NotServingRegionException()); - assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, LOCATOR - .getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get()); + assertSame(loc, + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get()); + LOCATOR.updateCachedLocationOnError(loc, new NotServingRegionException()); + assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, false).get()); } // usually locate after will return the same result, so we add a test to make it return different @@ -266,21 +276,21 @@ public class TestAsyncNonMetaRegionLocator { TEST_UTIL.createTable(TABLE_NAME, FAMILY, new byte[][] { splitKey }); TEST_UTIL.waitTableAvailable(TABLE_NAME); HRegionLocation currentLoc = - LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT, false).get(); + getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.CURRENT, false).get(); ServerName currentServerName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName(); assertLocEquals(EMPTY_START_ROW, splitKey, currentServerName, currentLoc); HRegionLocation afterLoc = - LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get(); + getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get(); ServerName afterServerName = - TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) - .filter(rs -> rs.getRegions(TABLE_NAME).stream() - .anyMatch(r -> Bytes.equals(splitKey, r.getRegionInfo().getStartKey()))) - .findAny().get().getServerName(); + TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) + .filter(rs -> rs.getRegions(TABLE_NAME).stream() + .anyMatch(r -> Bytes.equals(splitKey, r.getRegionInfo().getStartKey()))) + .findAny().get().getServerName(); assertLocEquals(splitKey, EMPTY_END_ROW, afterServerName, afterLoc); assertSame(afterLoc, - LOCATOR.getRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get()); + getDefaultRegionLocation(TABLE_NAME, row, RegionLocateType.AFTER, false).get()); } // For HBASE-17402 @@ -292,9 +302,9 @@ public class TestAsyncNonMetaRegionLocator { ServerName[] serverNames = getLocations(startKeys); for (int i = 0; i < 100; i++) { LOCATOR.clearCache(TABLE_NAME); - List<CompletableFuture<HRegionLocation>> futures = IntStream.range(0, 1000) - .mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s)) - .map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false)) + List<CompletableFuture<HRegionLocation>> futures = + IntStream.range(0, 1000).mapToObj(n -> String.format("%03d", n)).map(s -> Bytes.toBytes(s)) + .map(r -> getDefaultRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false)) .collect(toList()); for (int j = 0; j < 1000; j++) { int index = Math.min(8, j / 111); @@ -309,11 +319,11 @@ public class TestAsyncNonMetaRegionLocator { ServerName serverName = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME).getServerName(); for (RegionLocateType locateType : RegionLocateType.values()) { assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get()); + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get()); } ServerName newServerName = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() - .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)) - .findAny().get(); + .map(t -> t.getRegionServer().getServerName()).filter(sn -> !sn.equals(serverName)).findAny() + .get(); Admin admin = TEST_UTIL.getAdmin(); RegionInfo region = admin.getRegions(TABLE_NAME).stream().findAny().get(); admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(newServerName.getServerName())); @@ -334,15 +344,15 @@ public class TestAsyncNonMetaRegionLocator { // The cached location will not change for (RegionLocateType locateType : RegionLocateType.values()) { assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, serverName, - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get()); + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get()); } // should get the new location when reload = true assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get()); + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get()); // the cached location should be replaced for (RegionLocateType locateType : RegionLocateType.values()) { assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName, - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get()); + getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get()); } } @@ -351,10 +361,32 @@ public class TestAsyncNonMetaRegionLocator { public void testLocateBeforeLastRegion() throws IOException, InterruptedException, ExecutionException { createMultiRegionTable(); - LOCATOR.getRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join(); + getDefaultRegionLocation(TABLE_NAME, SPLIT_KEYS[0], RegionLocateType.CURRENT, false).join(); HRegionLocation loc = - LOCATOR.getRegionLocation(TABLE_NAME, EMPTY_END_ROW, RegionLocateType.BEFORE, false).get(); + getDefaultRegionLocation(TABLE_NAME, EMPTY_END_ROW, RegionLocateType.BEFORE, false).get(); // should locate to the last region assertArrayEquals(loc.getRegion().getEndKey(), EMPTY_END_ROW); } + + @Test + public void testRegionReplicas() throws Exception { + TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3).build()); + TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME); + testLocator(TEST_UTIL, TABLE_NAME, new Locator() { + + @Override + public void updateCachedLocationOnError(HRegionLocation loc, Throwable error) + throws Exception { + LOCATOR.updateCachedLocationOnError(loc, error); + } + + @Override + public RegionLocations getRegionLocations(TableName tableName, int replicaId, boolean reload) + throws Exception { + return LOCATOR.getRegionLocations(tableName, EMPTY_START_ROW, replicaId, + RegionLocateType.CURRENT, reload).get(); + } + }); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java index c6624e7..8cdb4a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java @@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; @@ -59,7 +60,7 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class); + HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -124,10 +125,10 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit { TEST_UTIL.getAdmin().balancerSwitch(false, true); AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, - registry.getClusterId().get(), User.getCurrent()); + registry.getClusterId().get(), User.getCurrent()); LOCATOR = new AsyncNonMetaRegionLocator(CONN); SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i))) - .toArray(byte[][]::new); + .toArray(byte[][]::new); TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS); TEST_UTIL.waitTableAvailable(TABLE_NAME); } @@ -138,11 +139,11 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit { TEST_UTIL.shutdownMiniCluster(); } - private void assertLocs(List<CompletableFuture<HRegionLocation>> futures) + private void assertLocs(List<CompletableFuture<RegionLocations>> futures) throws InterruptedException, ExecutionException { assertEquals(256, futures.size()); for (int i = 0; i < futures.size(); i++) { - HRegionLocation loc = futures.get(i).get(); + HRegionLocation loc = futures.get(i).get().getDefaultRegionLocation(); if (i == 0) { assertTrue(isEmptyStartRow(loc.getRegion().getStartKey())); } else { @@ -158,10 +159,11 @@ public class TestAsyncNonMetaRegionLocatorConcurrenyLimit { @Test public void test() throws InterruptedException, ExecutionException { - List<CompletableFuture<HRegionLocation>> futures = - IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i))) - .map(r -> LOCATOR.getRegionLocation(TABLE_NAME, r, RegionLocateType.CURRENT, false)) - .collect(toList()); + List<CompletableFuture<RegionLocations>> futures = + IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i))) + .map(r -> LOCATOR.getRegionLocations(TABLE_NAME, r, RegionReplicaUtil.DEFAULT_REPLICA_ID, + RegionLocateType.CURRENT, false)) + .collect(toList()); assertLocs(futures); assertTrue("max allowed is " + MAX_ALLOWED + " but actual is " + MAX_CONCURRENCY.get(), MAX_CONCURRENCY.get() <= MAX_ALLOWED); http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java index a6c2efb..7d8956b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java @@ -49,7 +49,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class); + HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -73,7 +73,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller { TEST_UTIL.waitTableAvailable(TABLE_NAME); AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, - registry.getClusterId().get(), User.getCurrent()); + registry.getClusterId().get(), User.getCurrent()); } @AfterClass @@ -89,8 +89,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller { int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegion().getRegionName()); TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(), Bytes.toBytes( TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName())); - AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME) - .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(30).build(); + AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS) + .setMaxRetries(30).build(); table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); // move back @@ -110,8 +110,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller { public void testMaxRetries() throws IOException, InterruptedException { try { CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS) - .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS) - .action((controller, loc, stub) -> failedFuture()).call().get(); + .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS) + .action((controller, loc, stub) -> failedFuture()).call().get(); fail(); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); @@ -123,8 +123,8 @@ public class TestAsyncSingleRequestRpcRetryingCaller { long startNs = System.nanoTime(); try { CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.SECONDS) - .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE) - .action((controller, loc, stub) -> failedFuture()).call().get(); + .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE) + .action((controller, loc, stub) -> failedFuture()).call().get(); fail(); } catch (ExecutionException e) { e.printStackTrace(); @@ -141,30 +141,30 @@ public class TestAsyncSingleRequestRpcRetryingCaller { AtomicInteger count = new AtomicInteger(0); HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); AsyncRegionLocator mockedLocator = - new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) { - @Override - CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, - RegionLocateType locateType, long timeoutNs) { - if (tableName.equals(TABLE_NAME)) { - CompletableFuture<HRegionLocation> future = new CompletableFuture<>(); - if (count.getAndIncrement() == 0) { - errorTriggered.set(true); - future.completeExceptionally(new RuntimeException("Inject error!")); - } else { - future.complete(loc); - } - return future; + new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) { + @Override + CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row, + int replicaId, RegionLocateType locateType, long timeoutNs) { + if (tableName.equals(TABLE_NAME)) { + CompletableFuture<HRegionLocation> future = new CompletableFuture<>(); + if (count.getAndIncrement() == 0) { + errorTriggered.set(true); + future.completeExceptionally(new RuntimeException("Inject error!")); } else { - return super.getRegionLocation(tableName, row, locateType, timeoutNs); + future.complete(loc); } + return future; + } else { + return super.getRegionLocation(tableName, row, replicaId, locateType, timeoutNs); } + } - @Override - void updateCachedLocation(HRegionLocation loc, Throwable exception) { - } - }; + @Override + void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { + } + }; try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(), - CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) { + CONN.registry, CONN.registry.getClusterId().get(), User.getCurrent()) { @Override AsyncRegionLocator getLocator() { @@ -172,7 +172,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller { } }) { AsyncTable<?> table = mockedConn.getTableBuilder(TABLE_NAME) - .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build(); + .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build(); table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); assertTrue(errorTriggered.get()); errorTriggered.set(false); http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java index 13d8000..6c6bb98 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableLocatePrefetch.java @@ -69,8 +69,8 @@ public class TestAsyncTableLocatePrefetch { @Test public void test() throws InterruptedException, ExecutionException { - assertNotNull(LOCATOR - .getRegionLocation(TABLE_NAME, Bytes.toBytes("zzz"), RegionLocateType.CURRENT, false).get()); + assertNotNull(LOCATOR.getRegionLocations(TABLE_NAME, Bytes.toBytes("zzz"), + RegionReplicaUtil.DEFAULT_REPLICA_ID, RegionLocateType.CURRENT, false).get()); // we finish the request before we adding the remaining results to cache so sleep a bit here Thread.sleep(1000); // confirm that the locations of all the regions have been cached. http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java new file mode 100644 index 0000000..0445a0e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableRegionReplicasGet { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncTableRegionReplicasGet.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] QUALIFIER = Bytes.toBytes("cq"); + + private static byte[] ROW = Bytes.toBytes("row"); + + private static byte[] VALUE = Bytes.toBytes("value"); + + private static AsyncConnection ASYNC_CONN; + + @Rule + public TestName testName = new TestName(); + + @Parameter + public Supplier<AsyncTable<?>> getTable; + + private static AsyncTable<?> getRawTable() { + return ASYNC_CONN.getTable(TABLE_NAME); + } + + private static AsyncTable<?> getTable() { + return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + } + + @Parameters + public static List<Object[]> params() { + return Arrays.asList(new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getRawTable }, + new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getTable }); + } + + private static volatile boolean FAIL_PRIMARY_GET = false; + + private static AtomicInteger PRIMARY_GET_COUNT = new AtomicInteger(0); + + private static AtomicInteger SECONDARY_GET_COUNT = new AtomicInteger(0); + + public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor { + + @Override + public Optional<RegionObserver> getRegionObserver() { + return Optional.of(this); + } + + @Override + public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, + List<Cell> result) throws IOException { + RegionInfo region = c.getEnvironment().getRegionInfo(); + if (!region.getTable().equals(TABLE_NAME)) { + return; + } + if (region.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) { + SECONDARY_GET_COUNT.incrementAndGet(); + } else { + PRIMARY_GET_COUNT.incrementAndGet(); + if (FAIL_PRIMARY_GET) { + throw new IOException("Inject error"); + } + } + } + } + + private static boolean allReplicasHaveRow() throws IOException { + for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) { + for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) { + if (region.get(new Get(ROW), false).isEmpty()) { + return false; + } + } + } + return true; + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // 10 mins + TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, + TimeUnit.MINUTES.toMillis(10)); + // 1 second + TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND, + TimeUnit.SECONDS.toMicros(1)); + // set a small pause so we will retry very quickly + TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10); + // infinite retry + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE); + TEST_UTIL.startMiniCluster(3); + TEST_UTIL.getAdmin() + .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(3) + .setCoprocessor(FailPrimaryGetCP.class.getName()).build()); + TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME); + table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); + // this is the fastest way to let all replicas have the row + TEST_UTIL.getAdmin().disableTable(TABLE_NAME); + TEST_UTIL.getAdmin().enableTable(TABLE_NAME); + TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + IOUtils.closeQuietly(ASYNC_CONN); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testNoReplicaRead() throws Exception { + FAIL_PRIMARY_GET = false; + SECONDARY_GET_COUNT.set(0); + AsyncTable<?> table = getTable.get(); + Get get = new Get(ROW).setConsistency(Consistency.TIMELINE); + for (int i = 0; i < 1000; i++) { + assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER)); + } + // the primary region is fine and the primary timeout is 1 second which is long enough, so we + // should not send any requests to secondary replicas even if the consistency is timeline. + Thread.sleep(5000); + assertEquals(0, SECONDARY_GET_COUNT.get()); + } + + @Test + public void testReplicaRead() throws Exception { + // fail the primary get request + FAIL_PRIMARY_GET = true; + Get get = new Get(ROW).setConsistency(Consistency.TIMELINE); + // make sure that we could still get the value from secondary replicas + AsyncTable<?> table = getTable.get(); + assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER)); + // make sure that the primary request has been canceled + Thread.sleep(5000); + int count = PRIMARY_GET_COUNT.get(); + Thread.sleep(10000); + assertEquals(count, PRIMARY_GET_COUNT.get()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/db66e6cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java index db7546f..46890d0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java @@ -25,7 +25,6 @@ import static org.junit.Assert.assertNotSame; import java.io.IOException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; @@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; @@ -52,43 +50,13 @@ public class TestZKAsyncRegistry { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestZKAsyncRegistry.class); + HBaseClassTestRule.forClass(TestZKAsyncRegistry.class); - private static final Logger LOG = LoggerFactory.getLogger(TestZKAsyncRegistry.class); - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + static final Logger LOG = LoggerFactory.getLogger(TestZKAsyncRegistry.class); + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static ZKAsyncRegistry REGISTRY; - // waits for all replicas to have region location - static void waitUntilAllReplicasHavingRegionLocation(TableName tbl) throws IOException { - TEST_UTIL.waitFor( - TEST_UTIL.getConfiguration().getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true, - new ExplainingPredicate<IOException>() { - @Override - public String explainFailure() throws IOException { - return TEST_UTIL.explainTableAvailability(tbl); - } - - @Override - public boolean evaluate() throws IOException { - AtomicBoolean ready = new AtomicBoolean(true); - try { - RegionLocations locs = REGISTRY.getMetaRegionLocation().get(); - assertEquals(3, locs.getRegionLocations().length); - IntStream.range(0, 3).forEach(i -> { - HRegionLocation loc = locs.getRegionLocation(i); - if (loc == null) { - ready.set(false); - } - }); - } catch (Exception e) { - ready.set(false); - } - return ready.get(); - } - }); - } - @BeforeClass public static void setUp() throws Exception { TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3); @@ -107,14 +75,14 @@ public class TestZKAsyncRegistry { LOG.info("STARTED TEST"); String clusterId = REGISTRY.getClusterId().get(); String expectedClusterId = TEST_UTIL.getHBaseCluster().getMaster().getClusterId(); - assertEquals("Expected " + expectedClusterId + ", found=" + clusterId, - expectedClusterId, clusterId); + assertEquals("Expected " + expectedClusterId + ", found=" + clusterId, expectedClusterId, + clusterId); assertEquals(TEST_UTIL.getHBaseCluster().getClusterMetrics().getLiveServerMetrics().size(), REGISTRY.getCurrentNrHRS().get().intValue()); assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(), REGISTRY.getMasterAddress().get()); assertEquals(-1, REGISTRY.getMasterInfoPort().get().intValue()); - waitUntilAllReplicasHavingRegionLocation(TableName.META_TABLE_NAME); + RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3); RegionLocations locs = REGISTRY.getMetaRegionLocation().get(); assertEquals(3, locs.getRegionLocations().length); IntStream.range(0, 3).forEach(i -> {