bharathv commented on a change in pull request #1593:
URL: https://github.com/apache/hbase/pull/1593#discussion_r419578450
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -61,53 +65,79 @@
/**
* Master based registry implementation. Makes RPCs to the configured master
addresses from config
* {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}.
- *
+ * <p/>
* It supports hedged reads, which can be enabled by setting
* {@value
org.apache.hadoop.hbase.HConstants#MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY} to
True. Fan
Review comment:
nit: HConstant stuff need to be updated.
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -61,53 +65,79 @@
/**
* Master based registry implementation. Makes RPCs to the configured master
addresses from config
* {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}.
- *
+ * <p/>
* It supports hedged reads, which can be enabled by setting
* {@value
org.apache.hadoop.hbase.HConstants#MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY} to
True. Fan
* out the requests batch is controlled by
* {@value
org.apache.hadoop.hbase.HConstants#HBASE_RPCS_HEDGED_REQS_FANOUT_KEY}.
- *
+ * <p/>
* TODO: Handle changes to the configuration dynamically without having to
restart the client.
*/
@InterfaceAudience.Private
public class MasterRegistry implements ConnectionRegistry {
+
+ /** Configuration key that controls the fan out of requests **/
+ public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
+ "hbase.client.master_registry.hedged.fanout";
+
+ /** Default value for the fan out of hedged requests. **/
+ public static final int MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT = 2;
+
private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
+ private final int hedgedReadFanOut;
+
// Configured list of masters to probe the meta information from.
- private final Set<ServerName> masterServers;
+ private final Set<ServerName> masterAddrs;
+
+ private final List<ClientMetaService.Interface> masterStubs;
// RPC client used to talk to the masters.
private final RpcClient rpcClient;
private final RpcControllerFactory rpcControllerFactory;
- private final int rpcTimeoutMs;
-
- MasterRegistry(Configuration conf) throws UnknownHostException {
- boolean hedgedReadsEnabled =
conf.getBoolean(MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY,
- MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT);
- Configuration finalConf;
- if (!hedgedReadsEnabled) {
- // If hedged reads are disabled, it is equivalent to setting a fan out
of 1. We make a copy of
- // the configuration so that other places reusing this reference is not
affected.
- finalConf = new Configuration(conf);
- finalConf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, 1);
- } else {
- finalConf = conf;
+
+ /**
+ * Parses the list of master addresses from the provided configuration.
Supported format is comma
+ * separated host[:port] values. If no port number if specified, default
master port is assumed.
+ * @param conf Configuration to parse from.
+ */
+ private static Set<ServerName> parseMasterAddrs(Configuration conf) throws
UnknownHostException {
+ Set<ServerName> masterAddrs = new HashSet<>();
+ String configuredMasters = getMasterAddr(conf);
+ for (String masterAddr :
configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
+ HostAndPort masterHostPort =
+
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
+ masterAddrs.add(ServerName.valueOf(masterHostPort.toString(),
ServerName.NON_STARTCODE));
}
- if (conf.get(MASTER_ADDRS_KEY) != null) {
- finalConf.set(MASTER_ADDRS_KEY, conf.get(MASTER_ADDRS_KEY));
+ Preconditions.checkArgument(!masterAddrs.isEmpty(), "At least one master
address is needed");
+ return masterAddrs;
+ }
+
+ MasterRegistry(Configuration conf) throws IOException {
+ this.hedgedReadFanOut = conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
+ MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT);
+ int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
+ conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+ // XXX: we pass cluster id as null here since we do not have a cluster id
yet, we have to fetch
Review comment:
Starting to wonder, why this didn't get flagged in tests. I guess there
is some test hole with token based auth..
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -117,105 +147,97 @@ public static String getMasterAddr(Configuration conf)
throws UnknownHostExcepti
return String.format("%s:%d", hostname, port);
}
- /**
- * @return Stub needed to make RPC using a hedged channel to the master end
points.
- */
- private ClientMetaService.Interface getMasterStub() throws IOException {
- return ClientMetaService.newStub(
- rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(),
rpcTimeoutMs));
+ @FunctionalInterface
+ private interface Callable<T> {
+ void call(HBaseRpcController controller, ClientMetaService.Interface stub,
RpcCallback<T> done);
}
- /**
- * Parses the list of master addresses from the provided configuration.
Supported format is
- * comma separated host[:port] values. If no port number if specified,
default master port is
- * assumed.
- * @param conf Configuration to parse from.
- */
- private void parseMasterAddrs(Configuration conf) throws
UnknownHostException {
- String configuredMasters = getMasterAddr(conf);
- for (String masterAddr:
configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
- HostAndPort masterHostPort =
-
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
- masterServers.add(ServerName.valueOf(masterHostPort.toString(),
ServerName.NON_STARTCODE));
- }
- Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master
address is needed");
+ private <T extends Message> CompletableFuture<T>
call(ClientMetaService.Interface stub,
+ Callable<T> callable) {
+ HBaseRpcController controller = rpcControllerFactory.newController();
+ CompletableFuture<T> future = new CompletableFuture<>();
+ callable.call(controller, stub, resp -> {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ future.complete(resp);
+ }
+ });
+ return future;
}
- @VisibleForTesting
- public Set<ServerName> getParsedMasterServers() {
- return Collections.unmodifiableSet(masterServers);
+ private IOException badResponse(String debug) {
+ return new IOException(String.format("Invalid result for request %s. Will
be retried", debug));
}
- /**
- * Returns a call back that can be passed along to the non-blocking rpc
call. It is invoked once
- * the rpc finishes and the response is propagated to the passed future.
- * @param future Result future to which the rpc response is propagated.
- * @param isValidResp Checks if the rpc response has a valid result.
- * @param transformResult Transforms the result to a different form as
expected by callers.
- * @param hrc RpcController instance for this rpc.
- * @param debug Debug message passed along to the caller in case of
exceptions.
- * @param <T> RPC result type.
- * @param <R> Transformed type of the result.
- * @return A call back that can be embedded in the non-blocking rpc call.
- */
- private <T, R> RpcCallback<T> getRpcCallBack(CompletableFuture<R> future,
- Predicate<T> isValidResp, Function<T, R> transformResult,
HBaseRpcController hrc,
- final String debug) {
- return rpcResult -> {
- if (rpcResult == null) {
- future.completeExceptionally(
- new MasterRegistryFetchException(masterServers, hrc.getFailed()));
- return;
- }
- if (!isValidResp.test(rpcResult)) {
- // Rpc returned ok, but result was malformed.
- future.completeExceptionally(new IOException(
- String.format("Invalid result for request %s. Will be retried",
debug)));
- return;
- }
- future.complete(transformResult.apply(rpcResult));
- };
+ // send requests concurrently to hedgedReadsFanout masters
+ private <T extends Message> void groupCall(CompletableFuture<T> future, int
startIndexInclusive,
Review comment:
I think this could use a comment around logic.., without that, would be
difficult to follow.
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -61,53 +65,79 @@
/**
* Master based registry implementation. Makes RPCs to the configured master
addresses from config
* {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}.
- *
+ * <p/>
* It supports hedged reads, which can be enabled by setting
* {@value
org.apache.hadoop.hbase.HConstants#MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY} to
True. Fan
* out the requests batch is controlled by
* {@value
org.apache.hadoop.hbase.HConstants#HBASE_RPCS_HEDGED_REQS_FANOUT_KEY}.
- *
+ * <p/>
* TODO: Handle changes to the configuration dynamically without having to
restart the client.
*/
@InterfaceAudience.Private
public class MasterRegistry implements ConnectionRegistry {
+
+ /** Configuration key that controls the fan out of requests **/
+ public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
+ "hbase.client.master_registry.hedged.fanout";
+
+ /** Default value for the fan out of hedged requests. **/
+ public static final int MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT = 2;
+
private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
+ private final int hedgedReadFanOut;
+
// Configured list of masters to probe the meta information from.
- private final Set<ServerName> masterServers;
+ private final Set<ServerName> masterAddrs;
+
+ private final List<ClientMetaService.Interface> masterStubs;
// RPC client used to talk to the masters.
private final RpcClient rpcClient;
private final RpcControllerFactory rpcControllerFactory;
- private final int rpcTimeoutMs;
-
- MasterRegistry(Configuration conf) throws UnknownHostException {
- boolean hedgedReadsEnabled =
conf.getBoolean(MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY,
- MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT);
- Configuration finalConf;
- if (!hedgedReadsEnabled) {
- // If hedged reads are disabled, it is equivalent to setting a fan out
of 1. We make a copy of
- // the configuration so that other places reusing this reference is not
affected.
- finalConf = new Configuration(conf);
- finalConf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, 1);
- } else {
- finalConf = conf;
+
+ /**
+ * Parses the list of master addresses from the provided configuration.
Supported format is comma
+ * separated host[:port] values. If no port number if specified, default
master port is assumed.
+ * @param conf Configuration to parse from.
+ */
+ private static Set<ServerName> parseMasterAddrs(Configuration conf) throws
UnknownHostException {
+ Set<ServerName> masterAddrs = new HashSet<>();
+ String configuredMasters = getMasterAddr(conf);
+ for (String masterAddr :
configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
+ HostAndPort masterHostPort =
+
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
+ masterAddrs.add(ServerName.valueOf(masterHostPort.toString(),
ServerName.NON_STARTCODE));
Review comment:
Randomize the master addrs so that the client RPC pattern is
non-deterministic?
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -117,105 +147,97 @@ public static String getMasterAddr(Configuration conf)
throws UnknownHostExcepti
return String.format("%s:%d", hostname, port);
}
- /**
- * @return Stub needed to make RPC using a hedged channel to the master end
points.
- */
- private ClientMetaService.Interface getMasterStub() throws IOException {
- return ClientMetaService.newStub(
- rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(),
rpcTimeoutMs));
+ @FunctionalInterface
+ private interface Callable<T> {
Review comment:
nit: Add a comment that this async, because of the way we are creating
the stubs, just to clarify?
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -117,105 +147,97 @@ public static String getMasterAddr(Configuration conf)
throws UnknownHostExcepti
return String.format("%s:%d", hostname, port);
}
- /**
- * @return Stub needed to make RPC using a hedged channel to the master end
points.
- */
- private ClientMetaService.Interface getMasterStub() throws IOException {
- return ClientMetaService.newStub(
- rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(),
rpcTimeoutMs));
+ @FunctionalInterface
+ private interface Callable<T> {
+ void call(HBaseRpcController controller, ClientMetaService.Interface stub,
RpcCallback<T> done);
}
- /**
- * Parses the list of master addresses from the provided configuration.
Supported format is
- * comma separated host[:port] values. If no port number if specified,
default master port is
- * assumed.
- * @param conf Configuration to parse from.
- */
- private void parseMasterAddrs(Configuration conf) throws
UnknownHostException {
- String configuredMasters = getMasterAddr(conf);
- for (String masterAddr:
configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
- HostAndPort masterHostPort =
-
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
- masterServers.add(ServerName.valueOf(masterHostPort.toString(),
ServerName.NON_STARTCODE));
- }
- Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master
address is needed");
+ private <T extends Message> CompletableFuture<T>
call(ClientMetaService.Interface stub,
+ Callable<T> callable) {
+ HBaseRpcController controller = rpcControllerFactory.newController();
+ CompletableFuture<T> future = new CompletableFuture<>();
+ callable.call(controller, stub, resp -> {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ future.complete(resp);
+ }
+ });
+ return future;
}
- @VisibleForTesting
- public Set<ServerName> getParsedMasterServers() {
- return Collections.unmodifiableSet(masterServers);
+ private IOException badResponse(String debug) {
+ return new IOException(String.format("Invalid result for request %s. Will
be retried", debug));
}
- /**
- * Returns a call back that can be passed along to the non-blocking rpc
call. It is invoked once
- * the rpc finishes and the response is propagated to the passed future.
- * @param future Result future to which the rpc response is propagated.
- * @param isValidResp Checks if the rpc response has a valid result.
- * @param transformResult Transforms the result to a different form as
expected by callers.
- * @param hrc RpcController instance for this rpc.
- * @param debug Debug message passed along to the caller in case of
exceptions.
- * @param <T> RPC result type.
- * @param <R> Transformed type of the result.
- * @return A call back that can be embedded in the non-blocking rpc call.
- */
- private <T, R> RpcCallback<T> getRpcCallBack(CompletableFuture<R> future,
- Predicate<T> isValidResp, Function<T, R> transformResult,
HBaseRpcController hrc,
- final String debug) {
- return rpcResult -> {
- if (rpcResult == null) {
- future.completeExceptionally(
- new MasterRegistryFetchException(masterServers, hrc.getFailed()));
- return;
- }
- if (!isValidResp.test(rpcResult)) {
- // Rpc returned ok, but result was malformed.
- future.completeExceptionally(new IOException(
- String.format("Invalid result for request %s. Will be retried",
debug)));
- return;
- }
- future.complete(transformResult.apply(rpcResult));
- };
+ // send requests concurrently to hedgedReadsFanout masters
+ private <T extends Message> void groupCall(CompletableFuture<T> future, int
startIndexInclusive,
+ Callable<T> callable, Predicate<T> isValidResp, String debug,
+ ConcurrentLinkedQueue<Throwable> errors) {
+ int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut,
masterStubs.size());
+ AtomicInteger remaining = new AtomicInteger(endIndexExclusive -
startIndexInclusive);
+ for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
+ addListener(call(masterStubs.get(i), callable), (r, e) -> {
+ // a simple check to skip all the later operations earlier
+ if (future.isDone()) {
+ return;
+ }
+ if (e == null && !isValidResp.test(r)) {
+ e = badResponse(debug);
+ }
+ if (e != null) {
+ // make sure when remaining reaches 0 we have all exceptions in the
errors queue
+ errors.add(e);
+ if (remaining.decrementAndGet() == 0) {
+ if (endIndexExclusive == masterStubs.size()) {
+ // we are done, complete the future with exception
+ RetriesExhaustedException ex = new
RetriesExhaustedException("masters",
Review comment:
nit: technically these are not retries right? Instead wrap all the
errors in to MasterRegistryFetch..?
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -117,105 +147,97 @@ public static String getMasterAddr(Configuration conf)
throws UnknownHostExcepti
return String.format("%s:%d", hostname, port);
}
- /**
- * @return Stub needed to make RPC using a hedged channel to the master end
points.
- */
- private ClientMetaService.Interface getMasterStub() throws IOException {
- return ClientMetaService.newStub(
- rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(),
rpcTimeoutMs));
+ @FunctionalInterface
+ private interface Callable<T> {
+ void call(HBaseRpcController controller, ClientMetaService.Interface stub,
RpcCallback<T> done);
}
- /**
- * Parses the list of master addresses from the provided configuration.
Supported format is
- * comma separated host[:port] values. If no port number if specified,
default master port is
- * assumed.
- * @param conf Configuration to parse from.
- */
- private void parseMasterAddrs(Configuration conf) throws
UnknownHostException {
- String configuredMasters = getMasterAddr(conf);
- for (String masterAddr:
configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
- HostAndPort masterHostPort =
-
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
- masterServers.add(ServerName.valueOf(masterHostPort.toString(),
ServerName.NON_STARTCODE));
- }
- Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master
address is needed");
+ private <T extends Message> CompletableFuture<T>
call(ClientMetaService.Interface stub,
+ Callable<T> callable) {
+ HBaseRpcController controller = rpcControllerFactory.newController();
+ CompletableFuture<T> future = new CompletableFuture<>();
+ callable.call(controller, stub, resp -> {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ future.complete(resp);
+ }
+ });
+ return future;
}
- @VisibleForTesting
- public Set<ServerName> getParsedMasterServers() {
- return Collections.unmodifiableSet(masterServers);
+ private IOException badResponse(String debug) {
+ return new IOException(String.format("Invalid result for request %s. Will
be retried", debug));
}
- /**
- * Returns a call back that can be passed along to the non-blocking rpc
call. It is invoked once
- * the rpc finishes and the response is propagated to the passed future.
- * @param future Result future to which the rpc response is propagated.
- * @param isValidResp Checks if the rpc response has a valid result.
- * @param transformResult Transforms the result to a different form as
expected by callers.
- * @param hrc RpcController instance for this rpc.
- * @param debug Debug message passed along to the caller in case of
exceptions.
- * @param <T> RPC result type.
- * @param <R> Transformed type of the result.
- * @return A call back that can be embedded in the non-blocking rpc call.
- */
- private <T, R> RpcCallback<T> getRpcCallBack(CompletableFuture<R> future,
- Predicate<T> isValidResp, Function<T, R> transformResult,
HBaseRpcController hrc,
- final String debug) {
- return rpcResult -> {
- if (rpcResult == null) {
- future.completeExceptionally(
- new MasterRegistryFetchException(masterServers, hrc.getFailed()));
- return;
- }
- if (!isValidResp.test(rpcResult)) {
- // Rpc returned ok, but result was malformed.
- future.completeExceptionally(new IOException(
- String.format("Invalid result for request %s. Will be retried",
debug)));
- return;
- }
- future.complete(transformResult.apply(rpcResult));
- };
+ // send requests concurrently to hedgedReadsFanout masters
+ private <T extends Message> void groupCall(CompletableFuture<T> future, int
startIndexInclusive,
+ Callable<T> callable, Predicate<T> isValidResp, String debug,
+ ConcurrentLinkedQueue<Throwable> errors) {
+ int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut,
masterStubs.size());
+ AtomicInteger remaining = new AtomicInteger(endIndexExclusive -
startIndexInclusive);
+ for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
+ addListener(call(masterStubs.get(i), callable), (r, e) -> {
+ // a simple check to skip all the later operations earlier
+ if (future.isDone()) {
+ return;
+ }
+ if (e == null && !isValidResp.test(r)) {
+ e = badResponse(debug);
+ }
+ if (e != null) {
+ // make sure when remaining reaches 0 we have all exceptions in the
errors queue
+ errors.add(e);
+ if (remaining.decrementAndGet() == 0) {
+ if (endIndexExclusive == masterStubs.size()) {
+ // we are done, complete the future with exception
+ RetriesExhaustedException ex = new
RetriesExhaustedException("masters",
+ masterStubs.size(), new ArrayList<>(errors));
+ future.completeExceptionally(new
MasterRegistryFetchException(masterAddrs, ex));
+ } else {
+ groupCall(future, endIndexExclusive, callable, isValidResp,
debug, errors);
+ }
+ }
+ } else {
+ // do not need to decrement the counter any more as we have already
finished the future.
+ future.complete(r);
Review comment:
What happens to the hedged calls? Shouldn't they be canceled?
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
##########
@@ -117,105 +147,97 @@ public static String getMasterAddr(Configuration conf)
throws UnknownHostExcepti
return String.format("%s:%d", hostname, port);
}
- /**
- * @return Stub needed to make RPC using a hedged channel to the master end
points.
- */
- private ClientMetaService.Interface getMasterStub() throws IOException {
- return ClientMetaService.newStub(
- rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(),
rpcTimeoutMs));
+ @FunctionalInterface
+ private interface Callable<T> {
+ void call(HBaseRpcController controller, ClientMetaService.Interface stub,
RpcCallback<T> done);
}
- /**
- * Parses the list of master addresses from the provided configuration.
Supported format is
- * comma separated host[:port] values. If no port number if specified,
default master port is
- * assumed.
- * @param conf Configuration to parse from.
- */
- private void parseMasterAddrs(Configuration conf) throws
UnknownHostException {
- String configuredMasters = getMasterAddr(conf);
- for (String masterAddr:
configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
- HostAndPort masterHostPort =
-
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
- masterServers.add(ServerName.valueOf(masterHostPort.toString(),
ServerName.NON_STARTCODE));
- }
- Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master
address is needed");
+ private <T extends Message> CompletableFuture<T>
call(ClientMetaService.Interface stub,
+ Callable<T> callable) {
+ HBaseRpcController controller = rpcControllerFactory.newController();
+ CompletableFuture<T> future = new CompletableFuture<>();
+ callable.call(controller, stub, resp -> {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ future.complete(resp);
+ }
+ });
+ return future;
}
- @VisibleForTesting
- public Set<ServerName> getParsedMasterServers() {
- return Collections.unmodifiableSet(masterServers);
+ private IOException badResponse(String debug) {
+ return new IOException(String.format("Invalid result for request %s. Will
be retried", debug));
}
- /**
- * Returns a call back that can be passed along to the non-blocking rpc
call. It is invoked once
- * the rpc finishes and the response is propagated to the passed future.
- * @param future Result future to which the rpc response is propagated.
- * @param isValidResp Checks if the rpc response has a valid result.
- * @param transformResult Transforms the result to a different form as
expected by callers.
- * @param hrc RpcController instance for this rpc.
- * @param debug Debug message passed along to the caller in case of
exceptions.
- * @param <T> RPC result type.
- * @param <R> Transformed type of the result.
- * @return A call back that can be embedded in the non-blocking rpc call.
- */
- private <T, R> RpcCallback<T> getRpcCallBack(CompletableFuture<R> future,
- Predicate<T> isValidResp, Function<T, R> transformResult,
HBaseRpcController hrc,
- final String debug) {
- return rpcResult -> {
- if (rpcResult == null) {
- future.completeExceptionally(
- new MasterRegistryFetchException(masterServers, hrc.getFailed()));
- return;
- }
- if (!isValidResp.test(rpcResult)) {
- // Rpc returned ok, but result was malformed.
- future.completeExceptionally(new IOException(
- String.format("Invalid result for request %s. Will be retried",
debug)));
- return;
- }
- future.complete(transformResult.apply(rpcResult));
- };
+ // send requests concurrently to hedgedReadsFanout masters
+ private <T extends Message> void groupCall(CompletableFuture<T> future, int
startIndexInclusive,
+ Callable<T> callable, Predicate<T> isValidResp, String debug,
+ ConcurrentLinkedQueue<Throwable> errors) {
+ int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut,
masterStubs.size());
+ AtomicInteger remaining = new AtomicInteger(endIndexExclusive -
startIndexInclusive);
+ for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
+ addListener(call(masterStubs.get(i), callable), (r, e) -> {
+ // a simple check to skip all the later operations earlier
+ if (future.isDone()) {
+ return;
+ }
+ if (e == null && !isValidResp.test(r)) {
+ e = badResponse(debug);
Review comment:
nit: add a debug log?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]