HBASE-19536 Client side changes for moving peer modification from zk watcher to procedure
Signed-off-by: zhangduo <zhang...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7e27d59d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7e27d59d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7e27d59d Branch: refs/heads/HBASE-19397 Commit: 7e27d59d01bdd79415745055c67332c71f534bbd Parents: 427bce7 Author: Guanghao Zhang <zg...@apache.org> Authored: Tue Dec 19 15:50:57 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Wed Dec 27 09:40:34 2017 +0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/client/Admin.java | 87 ++++++++++- .../apache/hadoop/hbase/client/HBaseAdmin.java | 149 ++++++++++++++----- .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 82 +++++----- 3 files changed, 238 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/7e27d59d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index ff2722e..cf8e198 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -2463,7 +2463,7 @@ public interface Admin extends Abortable, Closeable { /** * Add a new replication peer for replicating data to slave cluster. * @param peerId a short name that identifies the peer - * @param peerConfig configuration for the replication slave cluster + * @param peerConfig configuration for the replication peer * @throws IOException if a remote or network exception occurs */ default void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) @@ -2474,7 +2474,7 @@ public interface Admin extends Abortable, Closeable { /** * Add a new replication peer for replicating data to slave cluster. * @param peerId a short name that identifies the peer - * @param peerConfig configuration for the replication slave cluster + * @param peerConfig configuration for the replication peer * @param enabled peer state, true if ENABLED and false if DISABLED * @throws IOException if a remote or network exception occurs */ @@ -2482,6 +2482,37 @@ public interface Admin extends Abortable, Closeable { throws IOException; /** + * Add a new replication peer but does not block and wait for it. + * <p> + * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw + * ExecutionException if there was an error while executing the operation or TimeoutException in + * case the wait timeout was not long enough to allow the operation to complete. + * @param peerId a short name that identifies the peer + * @param peerConfig configuration for the replication peer + * @return the result of the async operation + * @throws IOException IOException if a remote or network exception occurs + */ + default Future<Void> addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig) + throws IOException { + return addReplicationPeerAsync(peerId, peerConfig, true); + } + + /** + * Add a new replication peer but does not block and wait for it. + * <p> + * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw + * ExecutionException if there was an error while executing the operation or TimeoutException in + * case the wait timeout was not long enough to allow the operation to complete. + * @param peerId a short name that identifies the peer + * @param peerConfig configuration for the replication peer + * @param enabled peer state, true if ENABLED and false if DISABLED + * @return the result of the async operation + * @throws IOException IOException if a remote or network exception occurs + */ + Future<Void> addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig, + boolean enabled) throws IOException; + + /** * Remove a peer and stop the replication. * @param peerId a short name that identifies the peer * @throws IOException if a remote or network exception occurs @@ -2489,6 +2520,18 @@ public interface Admin extends Abortable, Closeable { void removeReplicationPeer(String peerId) throws IOException; /** + * Remove a replication peer but does not block and wait for it. + * <p> + * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw + * ExecutionException if there was an error while executing the operation or TimeoutException in + * case the wait timeout was not long enough to allow the operation to complete. + * @param peerId a short name that identifies the peer + * @return the result of the async operation + * @throws IOException IOException if a remote or network exception occurs + */ + Future<Void> removeReplicationPeerAsync(String peerId) throws IOException; + + /** * Restart the replication stream to the specified peer. * @param peerId a short name that identifies the peer * @throws IOException if a remote or network exception occurs @@ -2496,6 +2539,18 @@ public interface Admin extends Abortable, Closeable { void enableReplicationPeer(String peerId) throws IOException; /** + * Enable a replication peer but does not block and wait for it. + * <p> + * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw + * ExecutionException if there was an error while executing the operation or TimeoutException in + * case the wait timeout was not long enough to allow the operation to complete. + * @param peerId a short name that identifies the peer + * @return the result of the async operation + * @throws IOException IOException if a remote or network exception occurs + */ + Future<Void> enableReplicationPeerAsync(String peerId) throws IOException; + + /** * Stop the replication stream to the specified peer. * @param peerId a short name that identifies the peer * @throws IOException if a remote or network exception occurs @@ -2503,6 +2558,18 @@ public interface Admin extends Abortable, Closeable { void disableReplicationPeer(String peerId) throws IOException; /** + * Disable a replication peer but does not block and wait for it. + * <p> + * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw + * ExecutionException if there was an error while executing the operation or TimeoutException in + * case the wait timeout was not long enough to allow the operation to complete. + * @param peerId a short name that identifies the peer + * @return the result of the async operation + * @throws IOException IOException if a remote or network exception occurs + */ + Future<Void> disableReplicationPeerAsync(String peerId) throws IOException; + + /** * Returns the configured ReplicationPeerConfig for the specified peer. * @param peerId a short name that identifies the peer * @return ReplicationPeerConfig for the peer @@ -2513,13 +2580,27 @@ public interface Admin extends Abortable, Closeable { /** * Update the peerConfig for the specified peer. * @param peerId a short name that identifies the peer - * @param peerConfig new config for the peer + * @param peerConfig new config for the replication peer * @throws IOException if a remote or network exception occurs */ void updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) throws IOException; /** + * Update the peerConfig for the specified peer but does not block and wait for it. + * <p> + * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw + * ExecutionException if there was an error while executing the operation or TimeoutException in + * case the wait timeout was not long enough to allow the operation to complete. + * @param peerId a short name that identifies the peer + * @param peerConfig new config for the replication peer + * @return the result of the async operation + * @throws IOException IOException if a remote or network exception occurs + */ + Future<Void> updateReplicationPeerConfigAsync(String peerId, ReplicationPeerConfig peerConfig) + throws IOException; + + /** * Append the replicable table column family config from the specified peer. * @param id a short that identifies the cluster * @param tableCfs A map from tableName to column family names http://git-wip-us.apache.org/repos/asf/hbase/blob/7e27d59d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index af3916d..5998fb2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -199,7 +200,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTa import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; /** @@ -3779,6 +3785,25 @@ public class HBaseAdmin implements Admin { } } + @InterfaceAudience.Private + @InterfaceStability.Evolving + private static class ReplicationFuture extends ProcedureFuture<Void> { + private final String peerId; + private final Supplier<String> getOperation; + + public ReplicationFuture(HBaseAdmin admin, String peerId, Long procId, + Supplier<String> getOperation) { + super(admin, procId); + this.peerId = peerId; + this.getOperation = getOperation; + } + + @Override + public String toString() { + return "Operation: " + getOperation.get() + ", peerId: " + peerId; + } + } + @Override public List<SecurityCapability> getSecurityCapabilities() throws IOException { try { @@ -3851,50 +3876,82 @@ public class HBaseAdmin implements Admin { @Override public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws IOException { - executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { - @Override - protected Void rpcCall() throws Exception { - master.addReplicationPeer(getRpcController(), - RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled)); - return null; - } - }); + get(addReplicationPeerAsync(peerId, peerConfig, enabled), this.syncWaitTimeout, + TimeUnit.MILLISECONDS); + } + + @Override + public Future<Void> addReplicationPeerAsync(String peerId, ReplicationPeerConfig peerConfig, + boolean enabled) throws IOException { + AddReplicationPeerResponse response = executeCallable( + new MasterCallable<AddReplicationPeerResponse>(getConnection(), getRpcControllerFactory()) { + @Override + protected AddReplicationPeerResponse rpcCall() throws Exception { + return master.addReplicationPeer(getRpcController(), + RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled)); + } + }); + return new ReplicationFuture(this, peerId, response.getProcId(), () -> "ADD_REPLICATION_PEER"); } @Override public void removeReplicationPeer(String peerId) throws IOException { - executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { - @Override - protected Void rpcCall() throws Exception { - master.removeReplicationPeer(getRpcController(), - RequestConverter.buildRemoveReplicationPeerRequest(peerId)); - return null; - } - }); + get(removeReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS); + } + + @Override + public Future<Void> removeReplicationPeerAsync(String peerId) throws IOException { + RemoveReplicationPeerResponse response = + executeCallable(new MasterCallable<RemoveReplicationPeerResponse>(getConnection(), + getRpcControllerFactory()) { + @Override + protected RemoveReplicationPeerResponse rpcCall() throws Exception { + return master.removeReplicationPeer(getRpcController(), + RequestConverter.buildRemoveReplicationPeerRequest(peerId)); + } + }); + return new ReplicationFuture(this, peerId, response.getProcId(), + () -> "REMOVE_REPLICATION_PEER"); } @Override public void enableReplicationPeer(final String peerId) throws IOException { - executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { - @Override - protected Void rpcCall() throws Exception { - master.enableReplicationPeer(getRpcController(), - RequestConverter.buildEnableReplicationPeerRequest(peerId)); - return null; - } - }); + get(enableReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS); + } + + @Override + public Future<Void> enableReplicationPeerAsync(final String peerId) throws IOException { + EnableReplicationPeerResponse response = + executeCallable(new MasterCallable<EnableReplicationPeerResponse>(getConnection(), + getRpcControllerFactory()) { + @Override + protected EnableReplicationPeerResponse rpcCall() throws Exception { + return master.enableReplicationPeer(getRpcController(), + RequestConverter.buildEnableReplicationPeerRequest(peerId)); + } + }); + return new ReplicationFuture(this, peerId, response.getProcId(), + () -> "ENABLE_REPLICATION_PEER"); } @Override public void disableReplicationPeer(final String peerId) throws IOException { - executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { - @Override - protected Void rpcCall() throws Exception { - master.disableReplicationPeer(getRpcController(), - RequestConverter.buildDisableReplicationPeerRequest(peerId)); - return null; - } - }); + get(disableReplicationPeerAsync(peerId), this.syncWaitTimeout, TimeUnit.MILLISECONDS); + } + + @Override + public Future<Void> disableReplicationPeerAsync(final String peerId) throws IOException { + DisableReplicationPeerResponse response = + executeCallable(new MasterCallable<DisableReplicationPeerResponse>(getConnection(), + getRpcControllerFactory()) { + @Override + protected DisableReplicationPeerResponse rpcCall() throws Exception { + return master.disableReplicationPeer(getRpcController(), + RequestConverter.buildDisableReplicationPeerRequest(peerId)); + } + }); + return new ReplicationFuture(this, peerId, response.getProcId(), + () -> "DISABLE_REPLICATION_PEER"); } @Override @@ -3913,14 +3970,24 @@ public class HBaseAdmin implements Admin { @Override public void updateReplicationPeerConfig(final String peerId, final ReplicationPeerConfig peerConfig) throws IOException { - executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { - @Override - protected Void rpcCall() throws Exception { - master.updateReplicationPeerConfig(getRpcController(), - RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig)); - return null; - } - }); + get(updateReplicationPeerConfigAsync(peerId, peerConfig), this.syncWaitTimeout, + TimeUnit.MILLISECONDS); + } + + @Override + public Future<Void> updateReplicationPeerConfigAsync(final String peerId, + final ReplicationPeerConfig peerConfig) throws IOException { + UpdateReplicationPeerConfigResponse response = + executeCallable(new MasterCallable<UpdateReplicationPeerConfigResponse>(getConnection(), + getRpcControllerFactory()) { + @Override + protected UpdateReplicationPeerConfigResponse rpcCall() throws Exception { + return master.updateReplicationPeerConfig(getRpcController(), + RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig)); + } + }); + return new ReplicationFuture(this, peerId, response.getProcId(), + () -> "UPDATE_REPLICATION_PEER_CONFIG"); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/7e27d59d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index dac83f3..e8ab786 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -1522,47 +1523,34 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture<Void> addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) { - return this - .<Void> newMasterCaller() - .action( - (controller, stub) -> this - .<AddReplicationPeerRequest, AddReplicationPeerResponse, Void> call(controller, stub, - RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), (s, - c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call(); + return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall( + RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), + (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(), + new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER")); } @Override public CompletableFuture<Void> removeReplicationPeer(String peerId) { - return this - .<Void> newMasterCaller() - .action( - (controller, stub) -> this - .<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse, Void> call(controller, - stub, RequestConverter.buildRemoveReplicationPeerRequest(peerId), - (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> null)).call(); + return this.<RemoveReplicationPeerRequest, RemoveReplicationPeerResponse> procedureCall( + RequestConverter.buildRemoveReplicationPeerRequest(peerId), + (s, c, req, done) -> s.removeReplicationPeer(c, req, done), (resp) -> resp.getProcId(), + new ReplicationProcedureBiConsumer(peerId, () -> "REMOVE_REPLICATION_PEER")); } @Override public CompletableFuture<Void> enableReplicationPeer(String peerId) { - return this - .<Void> newMasterCaller() - .action( - (controller, stub) -> this - .<EnableReplicationPeerRequest, EnableReplicationPeerResponse, Void> call(controller, - stub, RequestConverter.buildEnableReplicationPeerRequest(peerId), - (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> null)).call(); + return this.<EnableReplicationPeerRequest, EnableReplicationPeerResponse> procedureCall( + RequestConverter.buildEnableReplicationPeerRequest(peerId), + (s, c, req, done) -> s.enableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), + new ReplicationProcedureBiConsumer(peerId, () -> "ENABLE_REPLICATION_PEER")); } @Override public CompletableFuture<Void> disableReplicationPeer(String peerId) { - return this - .<Void> newMasterCaller() - .action( - (controller, stub) -> this - .<DisableReplicationPeerRequest, DisableReplicationPeerResponse, Void> call( - controller, stub, RequestConverter.buildDisableReplicationPeerRequest(peerId), (s, - c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> null)) - .call(); + return this.<DisableReplicationPeerRequest, DisableReplicationPeerResponse> procedureCall( + RequestConverter.buildDisableReplicationPeerRequest(peerId), + (s, c, req, done) -> s.disableReplicationPeer(c, req, done), (resp) -> resp.getProcId(), + new ReplicationProcedureBiConsumer(peerId, () -> "DISABLE_REPLICATION_PEER")); } @Override @@ -1581,13 +1569,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { public CompletableFuture<Void> updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig) { return this - .<Void> newMasterCaller() - .action( - (controller, stub) -> this - .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse, Void> call( - controller, stub, RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, - peerConfig), (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), ( - resp) -> null)).call(); + .<UpdateReplicationPeerConfigRequest, UpdateReplicationPeerConfigResponse> procedureCall( + RequestConverter.buildUpdateReplicationPeerConfigRequest(peerId, peerConfig), + (s, c, req, done) -> s.updateReplicationPeerConfig(c, req, done), + (resp) -> resp.getProcId(), + new ReplicationProcedureBiConsumer(peerId, () -> "UPDATE_REPLICATION_PEER_CONFIG")); } @Override @@ -2546,6 +2532,30 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } + private class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { + private final String peerId; + private final Supplier<String> getOperation; + + ReplicationProcedureBiConsumer(String peerId, Supplier<String> getOperation) { + this.peerId = peerId; + this.getOperation = getOperation; + } + + String getDescription() { + return "Operation: " + getOperation.get() + ", peerId: " + peerId; + } + + @Override + void onFinished() { + LOG.info(getDescription() + " completed"); + } + + @Override + void onError(Throwable error) { + LOG.info(getDescription() + " failed with " + error.getMessage()); + } + } + private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) { CompletableFuture<Void> future = new CompletableFuture<>(); procFuture.whenComplete((procId, error) -> {