Apache9 commented on a change in pull request #1593:
URL: https://github.com/apache/hbase/pull/1593#discussion_r419840354
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -117,105 +147,97 @@ public static String getMasterAddr(Configuration conf)
throws UnknownHostExcepti
return String.format("%s:%d", hostname, port);
}
- /**
- * @return Stub needed to make RPC using a hedged channel to the master end
points.
- */
- private ClientMetaService.Interface getMasterStub() throws IOException {
- return ClientMetaService.newStub(
- rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(),
rpcTimeoutMs));
+ @FunctionalInterface
+ private interface Callable<T> {
+ void call(HBaseRpcController controller, ClientMetaService.Interface stub,
RpcCallback<T> done);
}
- /**
- * Parses the list of master addresses from the provided configuration.
Supported format is
- * comma separated host[:port] values. If no port number if specified,
default master port is
- * assumed.
- * @param conf Configuration to parse from.
- */
- private void parseMasterAddrs(Configuration conf) throws
UnknownHostException {
- String configuredMasters = getMasterAddr(conf);
- for (String masterAddr:
configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
- HostAndPort masterHostPort =
-
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
- masterServers.add(ServerName.valueOf(masterHostPort.toString(),
ServerName.NON_STARTCODE));
- }
- Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master
address is needed");
+ private <T extends Message> CompletableFuture<T>
call(ClientMetaService.Interface stub,
+ Callable<T> callable) {
+ HBaseRpcController controller = rpcControllerFactory.newController();
+ CompletableFuture<T> future = new CompletableFuture<>();
+ callable.call(controller, stub, resp -> {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ future.complete(resp);
+ }
+ });
+ return future;
}
- @VisibleForTesting
- public Set<ServerName> getParsedMasterServers() {
- return Collections.unmodifiableSet(masterServers);
+ private IOException badResponse(String debug) {
+ return new IOException(String.format("Invalid result for request %s. Will
be retried", debug));
}
- /**
- * Returns a call back that can be passed along to the non-blocking rpc
call. It is invoked once
- * the rpc finishes and the response is propagated to the passed future.
- * @param future Result future to which the rpc response is propagated.
- * @param isValidResp Checks if the rpc response has a valid result.
- * @param transformResult Transforms the result to a different form as
expected by callers.
- * @param hrc RpcController instance for this rpc.
- * @param debug Debug message passed along to the caller in case of
exceptions.
- * @param <T> RPC result type.
- * @param <R> Transformed type of the result.
- * @return A call back that can be embedded in the non-blocking rpc call.
- */
- private <T, R> RpcCallback<T> getRpcCallBack(CompletableFuture<R> future,
- Predicate<T> isValidResp, Function<T, R> transformResult,
HBaseRpcController hrc,
- final String debug) {
- return rpcResult -> {
- if (rpcResult == null) {
- future.completeExceptionally(
- new MasterRegistryFetchException(masterServers, hrc.getFailed()));
- return;
- }
- if (!isValidResp.test(rpcResult)) {
- // Rpc returned ok, but result was malformed.
- future.completeExceptionally(new IOException(
- String.format("Invalid result for request %s. Will be retried",
debug)));
- return;
- }
- future.complete(transformResult.apply(rpcResult));
- };
+ // send requests concurrently to hedgedReadsFanout masters
+ private <T extends Message> void groupCall(CompletableFuture<T> future, int
startIndexInclusive,
+ Callable<T> callable, Predicate<T> isValidResp, String debug,
+ ConcurrentLinkedQueue<Throwable> errors) {
+ int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut,
masterStubs.size());
+ AtomicInteger remaining = new AtomicInteger(endIndexExclusive -
startIndexInclusive);
+ for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
+ addListener(call(masterStubs.get(i), callable), (r, e) -> {
+ // a simple check to skip all the later operations earlier
+ if (future.isDone()) {
+ return;
+ }
+ if (e == null && !isValidResp.test(r)) {
+ e = badResponse(debug);
+ }
+ if (e != null) {
+ // make sure when remaining reaches 0 we have all exceptions in the
errors queue
+ errors.add(e);
+ if (remaining.decrementAndGet() == 0) {
+ if (endIndexExclusive == masterStubs.size()) {
+ // we are done, complete the future with exception
+ RetriesExhaustedException ex = new
RetriesExhaustedException("masters",
Review comment:
The RetriesExhaustedException is used to wrap all the exceptions and
then it will be wrapped by the MasterRegistryFetchException to include all the
master addresses.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]