This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-21512 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 5f291e82436c767a21eb16a08a57ce6c49395302 Author: Duo Zhang <zhang...@apache.org> AuthorDate: Fri Jan 11 16:22:24 2019 +0800 HBASE-21671 Rewrite RegionReplicaReplicationEndpoint to use AsyncClusterConnection --- .../hadoop/hbase/client/AsyncConnectionImpl.java | 24 +- .../hbase/client/AsyncClusterConnection.java | 17 + .../hbase/client/AsyncClusterConnectionImpl.java | 80 +++ .../AsyncRegionReplicaReplayRetryingCaller.java | 146 ++++ .../hbase/client/AsyncRegionServerAdmin.java | 5 +- .../hbase/client/ClusterConnectionFactory.java | 2 +- .../hbase/protobuf/ReplicationProtbufUtil.java | 31 +- .../handler/RegionReplicaFlushHandler.java | 3 +- .../hbase/replication/ReplicationEndpoint.java | 35 +- .../RegionReplicaReplicationEndpoint.java | 782 +++++++-------------- .../regionserver/ReplicationSource.java | 2 +- .../hbase/client/TestAsyncTableNoncedRetry.java | 2 +- .../TestRegionReplicaReplicationEndpoint.java | 56 +- ...stRegionReplicaReplicationEndpointNoMaster.java | 99 ++- 14 files changed, 627 insertions(+), 657 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index be73dd8..9adad74 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -55,7 +55,6 @@ import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse; @@ -65,7 +64,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterServ * The implementation of AsyncConnection. */ @InterfaceAudience.Private -class AsyncConnectionImpl implements AsyncClusterConnection { +class AsyncConnectionImpl implements AsyncConnection { private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class); @@ -85,7 +84,7 @@ class AsyncConnectionImpl implements AsyncClusterConnection { private final int rpcTimeout; - private final RpcClient rpcClient; + protected final RpcClient rpcClient; final RpcControllerFactory rpcControllerFactory; @@ -160,16 +159,10 @@ class AsyncConnectionImpl implements AsyncClusterConnection { } // ditto - @Override - public NonceGenerator getNonceGenerator() { + NonceGenerator getNonceGenerator() { return nonceGenerator; } - @Override - public RpcClient getRpcClient() { - return rpcClient; - } - private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException { return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); } @@ -360,15 +353,4 @@ class AsyncConnectionImpl implements AsyncClusterConnection { return new HBaseHbck(MasterProtos.HbckService.newBlockingStub( rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory); } - - public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) { - return new AsyncRegionServerAdmin(serverName, this); - } - - @Override - public CompletableFuture<FlushRegionResponse> flush(byte[] regionName, - boolean writeFlushWALMarker) { - RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin(); - return admin.flushRegionInternal(regionName, writeFlushWALMarker); - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java similarity index 72% rename from hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java index f1f64ca..0ad77ba 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java @@ -17,9 +17,13 @@ */ package org.apache.hadoop.hbase.client; +import java.util.List; import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; @@ -49,4 +53,17 @@ public interface AsyncClusterConnection extends AsyncConnection { * Flush a region and get the response. */ CompletableFuture<FlushRegionResponse> flush(byte[] regionName, boolean writeFlushWALMarker); + + /** + * Replicate wal edits for replica regions. The return value is the edits we skipped, as the + * original return value is useless. + */ + CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row, + List<Entry> entries, int replicaId, int numRetries, long operationTimeoutNs); + + /** + * Return all the replicas for a region. Used for regiong replica replication. + */ + CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row, + boolean reload); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java new file mode 100644 index 0000000..d61f01f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.net.SocketAddress; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; + +/** + * The implementation of AsyncClusterConnection. + */ +@InterfaceAudience.Private +class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClusterConnection { + + public AsyncClusterConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId, + SocketAddress localAddress, User user) { + super(conf, registry, clusterId, localAddress, user); + } + + @Override + public NonceGenerator getNonceGenerator() { + return super.getNonceGenerator(); + } + + @Override + public RpcClient getRpcClient() { + return rpcClient; + } + + @Override + public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) { + return new AsyncRegionServerAdmin(serverName, this); + } + + @Override + public CompletableFuture<FlushRegionResponse> flush(byte[] regionName, + boolean writeFlushWALMarker) { + RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin(); + return admin.flushRegionInternal(regionName, writeFlushWALMarker); + } + + @Override + public CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row, + List<Entry> entries, int replicaId, int retries, long operationTimeoutNs) { + return new AsyncRegionReplicaReplayRetryingCaller(RETRY_TIMER, this, + ConnectionUtils.retries2Attempts(retries), operationTimeoutNs, tableName, encodedRegionName, + row, entries, replicaId).call(); + } + + @Override + public CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row, + boolean reload) { + return getLocator().getRegionLocations(tableName, row, RegionLocateType.CURRENT, reload, -1L); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java new file mode 100644 index 0000000..3364f15 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; + +/** + * For replaying edits for region replica. + * <p/> + * The mainly difference here is that, every time after locating, we will check whether the region + * name is equal, if not, we will give up, as this usually means the region has been split or + * merged, and the new region(s) should already have all the data of the parent region(s). + * <p/> + * Notice that, the return value is the edits we skipped, as the original response message is not + * used at upper layer. + */ +@InterfaceAudience.Private +public class AsyncRegionReplicaReplayRetryingCaller extends AsyncRpcRetryingCaller<Long> { + + private static final Logger LOG = + LoggerFactory.getLogger(AsyncRegionReplicaReplayRetryingCaller.class); + + private final TableName tableName; + + private final byte[] encodedRegionName; + + private final byte[] row; + + private final Entry[] entries; + + private final int replicaId; + + public AsyncRegionReplicaReplayRetryingCaller(HashedWheelTimer retryTimer, + AsyncClusterConnectionImpl conn, int maxAttempts, long operationTimeoutNs, + TableName tableName, byte[] encodedRegionName, byte[] row, List<Entry> entries, + int replicaId) { + super(retryTimer, conn, conn.connConf.getPauseNs(), maxAttempts, operationTimeoutNs, + conn.connConf.getWriteRpcTimeoutNs(), conn.connConf.getStartLogErrorsCnt()); + this.tableName = tableName; + this.encodedRegionName = encodedRegionName; + this.row = row; + this.entries = entries.toArray(new Entry[0]); + this.replicaId = replicaId; + } + + private void call(HRegionLocation loc) { + if (!Bytes.equals(encodedRegionName, loc.getRegion().getEncodedNameAsBytes())) { + if (LOG.isTraceEnabled()) { + LOG.trace( + "Skipping {} entries in table {} because located region {} is different than" + + " the original region {} from WALEdit", + entries.length, tableName, loc.getRegion().getEncodedName(), + Bytes.toStringBinary(encodedRegionName)); + for (Entry entry : entries) { + LOG.trace("Skipping : " + entry); + } + } + future.complete(Long.valueOf(entries.length)); + return; + } + + AdminService.Interface stub; + try { + stub = conn.getAdminStub(loc.getServerName()); + } catch (IOException e) { + onError(e, + () -> "Get async admin stub to " + loc.getServerName() + " for '" + + Bytes.toStringBinary(row) + "' in " + loc.getRegion().getEncodedName() + " of " + + tableName + " failed", + err -> conn.getLocator().updateCachedLocationOnError(loc, err)); + return; + } + Pair<ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil + .buildReplicateWALEntryRequest(entries, encodedRegionName, null, null, null); + resetCallTimeout(); + controller.setCellScanner(p.getSecond()); + stub.replay(controller, p.getFirst(), r -> { + if (controller.failed()) { + onError(controller.getFailed(), + () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " + + loc.getRegion().getEncodedName() + " of " + tableName + " failed", + err -> conn.getLocator().updateCachedLocationOnError(loc, err)); + } else { + future.complete(0L); + } + }); + + } + + @Override + protected void doCall() { + long locateTimeoutNs; + if (operationTimeoutNs > 0) { + locateTimeoutNs = remainingTimeNs(); + if (locateTimeoutNs <= 0) { + completeExceptionally(); + return; + } + } else { + locateTimeoutNs = -1L; + } + addListener(conn.getLocator().getRegionLocation(tableName, row, replicaId, + RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> { + if (error != null) { + onError(error, + () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed", err -> { + }); + return; + } + call(loc); + }); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java similarity index 99% rename from hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java index b9141a9..d491890 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java @@ -164,8 +164,9 @@ public class AsyncRegionServerAdmin { cellScanner); } - public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request) { - return call((stub, controller, done) -> stub.replay(controller, request, done)); + public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request, + CellScanner cellScanner) { + return call((stub, controller, done) -> stub.replay(controller, request, done), cellScanner); } public CompletableFuture<RollWALWriterResponse> rollWALWriter(RollWALWriterRequest request) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java similarity index 95% rename from hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java index 79484db..2670420 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java @@ -46,6 +46,6 @@ public final class ClusterConnectionFactory { SocketAddress localAddress, User user) throws IOException { AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf); String clusterId = FutureUtils.get(registry.getClusterId()); - return new AsyncConnectionImpl(conf, registry, clusterId, localAddress, user); + return new AsyncClusterConnectionImpl(conf, registry, clusterId, localAddress, user); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index 9f41a76..c39c86c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -37,7 +37,8 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @InterfaceAudience.Private @@ -55,20 +56,18 @@ public class ReplicationProtbufUtil { public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) throws IOException { - Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest( - entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir); + Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null, + replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir); FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond())); } /** * Create a new ReplicateWALEntryRequest from a list of WAL entries - * * @param entries the WAL entries to be replicated - * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values - * found. + * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found. */ - public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> - buildReplicateWALEntryRequest(final Entry[] entries) throws IOException { + public static Pair<ReplicateWALEntryRequest, CellScanner> buildReplicateWALEntryRequest( + final Entry[] entries) { return buildReplicateWALEntryRequest(entries, null, null, null, null); } @@ -82,16 +81,14 @@ public class ReplicationProtbufUtil { * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found. */ - public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> - buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName, - String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) - throws IOException { + public static Pair<ReplicateWALEntryRequest, CellScanner> buildReplicateWALEntryRequest( + final Entry[] entries, byte[] encodedRegionName, String replicationClusterId, + Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) { // Accumulate all the Cells seen in here. List<List<? extends Cell>> allCells = new ArrayList<>(entries.length); int size = 0; - AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder(); - AdminProtos.ReplicateWALEntryRequest.Builder builder = - AdminProtos.ReplicateWALEntryRequest.newBuilder(); + WALEntry.Builder entryBuilder = WALEntry.newBuilder(); + ReplicateWALEntryRequest.Builder builder = ReplicateWALEntryRequest.newBuilder(); for (Entry entry: entries) { entryBuilder.clear(); @@ -99,8 +96,8 @@ public class ReplicationProtbufUtil { try { keyBuilder = entry.getKey().getBuilder(WALCellCodec.getNoneCompressor()); } catch (IOException e) { - throw new IOException( - "There should not throw exception since NoneCompressor do not throw any exceptions", e); + throw new AssertionError( + "There should not throw exception since NoneCompressor do not throw any exceptions", e); } if(encodedRegionName != null){ keyBuilder.setEncodedRegionName( diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java index 0729203..cc798cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java @@ -185,7 +185,6 @@ public class RegionReplicaFlushHandler extends EventHandler { "Was not able to trigger a flush from primary region due to old server version? " + "Continuing to open the secondary region replica: " + region.getRegionInfo().getRegionNameAsString()); - region.setReadsEnabled(true); break; } } @@ -195,6 +194,6 @@ public class RegionReplicaFlushHandler extends EventHandler { throw new InterruptedIOException(e.getMessage()); } } + region.setReadsEnabled(true); } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index f4c37b1..ca73663 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -29,6 +29,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; @@ -53,6 +54,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) class Context { + private final Server server; private final Configuration localConf; private final Configuration conf; private final FileSystem fs; @@ -64,16 +66,11 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { private final Abortable abortable; @InterfaceAudience.Private - public Context( - final Configuration localConf, - final Configuration conf, - final FileSystem fs, - final String peerId, - final UUID clusterId, - final ReplicationPeer replicationPeer, - final MetricsSource metrics, - final TableDescriptors tableDescriptors, - final Abortable abortable) { + public Context(final Server server, final Configuration localConf, final Configuration conf, + final FileSystem fs, final String peerId, final UUID clusterId, + final ReplicationPeer replicationPeer, final MetricsSource metrics, + final TableDescriptors tableDescriptors, final Abortable abortable) { + this.server = server; this.localConf = localConf; this.conf = conf; this.fs = fs; @@ -84,34 +81,50 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { this.tableDescriptors = tableDescriptors; this.abortable = abortable; } + + public Server getServer() { + return server; + } + public Configuration getConfiguration() { return conf; } + public Configuration getLocalConfiguration() { return localConf; } + public FileSystem getFilesystem() { return fs; } + public UUID getClusterId() { return clusterId; } + public String getPeerId() { return peerId; } + public ReplicationPeerConfig getPeerConfig() { return replicationPeer.getPeerConfig(); } + public ReplicationPeer getReplicationPeer() { return replicationPeer; } + public MetricsSource getMetrics() { return metrics; } + public TableDescriptors getTableDescriptors() { return tableDescriptors; } - public Abortable getAbortable() { return abortable; } + + public Abortable getAbortable() { + return abortable; + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index f7721e0..65cf9a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -19,67 +19,47 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; -import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; +import java.util.Optional; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.RegionAdminServiceCallable; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.client.RegionReplicaUtil; -import org.apache.hadoop.hbase.client.RetryingCallable; -import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.util.AtomicUtils; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers; -import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink; -import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; -import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer; -import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter; -import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.cache.Cache; import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; - -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; +import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; /** - * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint - * which receives the WAL edits from the WAL, and sends the edits to replicas - * of regions. + * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint which receives the WAL + * edits from the WAL, and sends the edits to replicas of regions. */ @InterfaceAudience.Private public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { @@ -87,32 +67,55 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class); // Can be configured differently than hbase.client.retries.number - private static String CLIENT_RETRIES_NUMBER - = "hbase.region.replica.replication.client.retries.number"; + private static String CLIENT_RETRIES_NUMBER = + "hbase.region.replica.replication.client.retries.number"; private Configuration conf; - private ClusterConnection connection; + private AsyncClusterConnection connection; private TableDescriptors tableDescriptors; - // Reuse WALSplitter constructs as a WAL pipe - private PipelineController controller; - private RegionReplicaOutputSink outputSink; - private EntryBuffers entryBuffers; + private int numRetries; + + private long operationTimeoutNs; - // Number of writer threads - private int numWriterThreads; + private LoadingCache<TableName, Optional<TableDescriptor>> tableDescriptorCache; - private int operationTimeout; + private Cache<TableName, TableName> disabledTableCache; - private ExecutorService pool; + private final RetryCounterFactory retryCounterFactory = + new RetryCounterFactory(Integer.MAX_VALUE, 1000, 60000); @Override public void init(Context context) throws IOException { super.init(context); - - this.conf = HBaseConfiguration.create(context.getConfiguration()); + this.conf = context.getConfiguration(); this.tableDescriptors = context.getTableDescriptors(); - + int memstoreReplicationEnabledCacheExpiryMs = conf + .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000); + // A cache for the table "memstore replication enabled" flag. + // It has a default expiry of 5 sec. This means that if the table is altered + // with a different flag value, we might miss to replicate for that amount of + // time. But this cache avoid the slow lookup and parsing of the TableDescriptor. + tableDescriptorCache = CacheBuilder.newBuilder() + .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS) + .initialCapacity(10).maximumSize(1000) + .build(new CacheLoader<TableName, Optional<TableDescriptor>>() { + + @Override + public Optional<TableDescriptor> load(TableName tableName) throws Exception { + // check if the table requires memstore replication + // some unit-test drop the table, so we should do a bypass check and always replicate. + return Optional.ofNullable(tableDescriptors.get(tableName)); + } + }); + int nonExistentTableCacheExpiryMs = + conf.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000); + // A cache for non existing tables that have a default expiry of 5 sec. This means that if the + // table is created again with the same name, we might miss to replicate for that amount of + // time. But this cache prevents overloading meta requests for every edit from a deleted file. + disabledTableCache = CacheBuilder.newBuilder() + .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS).initialCapacity(10) + .maximumSize(1000).build(); // HRS multiplies client retries by 10 globally for meta operations, but we do not want this. // We are resetting it here because we want default number of retries (35) rather than 10 times // that which makes very long retries for disabled tables etc. @@ -123,516 +126,261 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER); defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already } - - conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); - int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries); - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries); - - this.numWriterThreads = this.conf.getInt( - "hbase.region.replica.replication.writer.threads", 3); - controller = new PipelineController(); - entryBuffers = new EntryBuffers(controller, - this.conf.getLong("hbase.region.replica.replication.buffersize", 128 * 1024 * 1024)); - + this.numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries); // use the regular RPC timeout for replica replication RPC's - this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - } - - @Override - protected void doStart() { - try { - connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf); - this.pool = getDefaultThreadPool(conf); - outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, entryBuffers, - connection, pool, numWriterThreads, operationTimeout); - outputSink.startWriterThreads(); - super.doStart(); - } catch (IOException ex) { - LOG.warn("Received exception while creating connection :" + ex); - notifyFailed(ex); - } - } - - @Override - protected void doStop() { - if (outputSink != null) { - try { - outputSink.finishWritingAndClose(); - } catch (IOException ex) { - LOG.warn("Got exception while trying to close OutputSink", ex); - } - } - if (this.pool != null) { - this.pool.shutdownNow(); - try { - // wait for 10 sec - boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS); - if (!shutdown) { - LOG.warn("Failed to shutdown the thread pool after 10 seconds"); - } - } catch (InterruptedException e) { - LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e); - } - } - if (connection != null) { - try { - connection.close(); - } catch (IOException ex) { - LOG.warn("Got exception closing connection :" + ex); - } - } - super.doStop(); + this.operationTimeoutNs = + TimeUnit.MILLISECONDS.toNanos(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); + this.connection = context.getServer().getAsyncClusterConnection(); } /** - * Returns a Thread pool for the RPC's to region replicas. Similar to - * Connection's thread pool. + * returns true if the specified entry must be replicated. We should always replicate meta + * operations (e.g. flush) and use the user HTD flag to decide whether or not replicate the + * memstore. */ - private ExecutorService getDefaultThreadPool(Configuration conf) { - int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256); - if (maxThreads == 0) { - maxThreads = Runtime.getRuntime().availableProcessors() * 8; + private boolean requiresReplication(Optional<TableDescriptor> tableDesc, Entry entry) { + // empty edit does not need to be replicated + if (entry.getEdit().isEmpty() || !tableDesc.isPresent()) { + return false; } - long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60); - LinkedBlockingQueue<Runnable> workQueue = - new LinkedBlockingQueue<>(maxThreads * - conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, - HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); - ThreadPoolExecutor tpe = new ThreadPoolExecutor( - maxThreads, - maxThreads, - keepAliveTime, - TimeUnit.SECONDS, - workQueue, - Threads.newDaemonThreadFactory(this.getClass().getSimpleName() + "-rpc-shared-")); - tpe.allowCoreThreadTimeOut(true); - return tpe; + // meta edits (e.g. flush) must be always replicated + return entry.getEdit().isMetaEdit() || tableDesc.get().hasRegionMemStoreReplication(); } - @Override - public boolean replicate(ReplicateContext replicateContext) { - /* A note on batching in RegionReplicaReplicationEndpoint (RRRE): - * - * RRRE relies on batching from two different mechanisms. The first is the batching from - * ReplicationSource since RRRE is a ReplicationEndpoint driven by RS. RS reads from a single - * WAL file filling up a buffer of heap size "replication.source.size.capacity"(64MB) or at most - * "replication.source.nb.capacity" entries or until it sees the end of file (in live tailing). - * Then RS passes all the buffered edits in this replicate() call context. RRRE puts the edits - * to the WALSplitter.EntryBuffers which is a blocking buffer space of up to - * "hbase.region.replica.replication.buffersize" (128MB) in size. This buffer splits the edits - * based on regions. - * - * There are "hbase.region.replica.replication.writer.threads"(default 3) writer threads which - * pick largest per-region buffer and send it to the SinkWriter (see RegionReplicaOutputSink). - * The SinkWriter in this case will send the wal edits to all secondary region replicas in - * parallel via a retrying rpc call. EntryBuffers guarantees that while a buffer is - * being written to the sink, another buffer for the same region will not be made available to - * writers ensuring regions edits are not replayed out of order. - * - * The replicate() call won't return until all the buffers are sent and ack'd by the sinks so - * that the replication can assume all edits are persisted. We may be able to do a better - * pipelining between the replication thread and output sinks later if it becomes a bottleneck. - */ - - while (this.isRunning()) { - try { - for (Entry entry: replicateContext.getEntries()) { - entryBuffers.appendEntry(entry); + private void getRegionLocations(CompletableFuture<RegionLocations> future, + TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, boolean reload) { + FutureUtils.addListener(connection.getRegionLocations(tableDesc.getTableName(), row, reload), + (r, e) -> { + if (e != null) { + future.completeExceptionally(e); + return; } - outputSink.flush(); // make sure everything is flushed - ctx.getMetrics().incrLogEditsFiltered( - outputSink.getSkippedEditsCounter().getAndSet(0)); - return true; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return false; - } catch (IOException e) { - LOG.warn("Received IOException while trying to replicate" - + StringUtils.stringifyException(e)); - } - } - - return false; - } - - @Override - public boolean canReplicateToSameCluster() { - return true; - } - - @Override - protected WALEntryFilter getScopeWALEntryFilter() { - // we do not care about scope. We replicate everything. - return null; + // if we are not loading from cache, just return + if (reload) { + future.complete(r); + return; + } + // check if the number of region replicas is correct, and also the primary region name + // matches + if (r.size() == tableDesc.getRegionReplication() && Bytes.equals( + r.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(), encodedRegionName)) { + future.complete(r); + } else { + // reload again as the information in cache maybe stale + getRegionLocations(future, tableDesc, encodedRegionName, row, true); + } + }); } - static class RegionReplicaOutputSink extends OutputSink { - private final RegionReplicaSinkWriter sinkWriter; - private final TableDescriptors tableDescriptors; - private final Cache<TableName, Boolean> memstoreReplicationEnabled; - - public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors, - EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool, - int numWriters, int operationTimeout) { - super(controller, entryBuffers, numWriters); - this.sinkWriter = - new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors); - this.tableDescriptors = tableDescriptors; - - // A cache for the table "memstore replication enabled" flag. - // It has a default expiry of 5 sec. This means that if the table is altered - // with a different flag value, we might miss to replicate for that amount of - // time. But this cache avoid the slow lookup and parsing of the TableDescriptor. - int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration() - .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000); - this.memstoreReplicationEnabled = CacheBuilder.newBuilder() - .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS) - .initialCapacity(10) - .maximumSize(1000) - .build(); + private void replicate(CompletableFuture<Long> future, RegionLocations locs, + TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, List<Entry> entries) { + if (locs.size() == 1) { + // Could this happen? + future.complete(Long.valueOf(entries.size())); + return; } - - @Override - public void append(RegionEntryBuffer buffer) throws IOException { - List<Entry> entries = buffer.getEntryBuffer(); - - if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) { - return; - } - - // meta edits (e.g. flush) are always replicated. - // data edits (e.g. put) are replicated if the table requires them. - if (!requiresReplication(buffer.getTableName(), entries)) { - return; + if (!Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(), + encodedRegionName)) { + // the region name is not equal, this usually means the region has been split or merged, so + // give up replicating as the new region(s) should already have all the data of the parent + // region(s). + if (LOG.isTraceEnabled()) { + LOG.trace( + "Skipping {} entries in table {} because located region {} is different than" + + " the original region {} from WALEdit", + tableDesc.getTableName(), locs.getDefaultRegionLocation().getRegion().getEncodedName(), + Bytes.toStringBinary(encodedRegionName)); } - - sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(), - CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)), entries); - } - - @Override - public boolean flush() throws IOException { - // nothing much to do for now. Wait for the Writer threads to finish up - // append()'ing the data. - entryBuffers.waitUntilDrained(); - return super.flush(); - } - - @Override - public boolean keepRegionEvent(Entry entry) { - return true; - } - - @Override - public List<Path> finishWritingAndClose() throws IOException { - finishWriting(true); - return null; - } - - @Override - public Map<byte[], Long> getOutputCounts() { - return null; // only used in tests - } - - @Override - public int getNumberOfRecoveredRegions() { - return 0; - } - - AtomicLong getSkippedEditsCounter() { - return skippedEdits; + future.complete(Long.valueOf(entries.size())); + return; } - - /** - * returns true if the specified entry must be replicated. - * We should always replicate meta operations (e.g. flush) - * and use the user HTD flag to decide whether or not replicate the memstore. - */ - private boolean requiresReplication(final TableName tableName, final List<Entry> entries) - throws IOException { - // unit-tests may not the TableDescriptors, bypass the check and always replicate - if (tableDescriptors == null) return true; - - Boolean requiresReplication = memstoreReplicationEnabled.getIfPresent(tableName); - if (requiresReplication == null) { - // check if the table requires memstore replication - // some unit-test drop the table, so we should do a bypass check and always replicate. - TableDescriptor htd = tableDescriptors.get(tableName); - requiresReplication = htd == null || htd.hasRegionMemStoreReplication(); - memstoreReplicationEnabled.put(tableName, requiresReplication); - } - - // if memstore replication is not required, check the entries. - // meta edits (e.g. flush) must be always replicated. - if (!requiresReplication) { - int skipEdits = 0; - java.util.Iterator<Entry> it = entries.iterator(); - while (it.hasNext()) { - Entry entry = it.next(); - if (entry.getEdit().isMetaEdit()) { - requiresReplication = true; + AtomicReference<Throwable> error = new AtomicReference<>(); + AtomicInteger remainingTasks = new AtomicInteger(locs.size() - 1); + AtomicLong skippedEdits = new AtomicLong(0); + + for (int i = 1, n = locs.size(); i < n; i++) { + final int replicaId = i; + FutureUtils.addListener(connection.replay(tableDesc.getTableName(), + locs.getRegionLocation(replicaId).getRegion().getEncodedNameAsBytes(), row, entries, + replicaId, numRetries, operationTimeoutNs), (r, e) -> { + if (e != null) { + LOG.warn("Failed to replicate to {}", locs.getRegionLocation(replicaId), e); + error.compareAndSet(null, e); } else { - it.remove(); - skipEdits++; + AtomicUtils.updateMax(skippedEdits, r.longValue()); } - } - skippedEdits.addAndGet(skipEdits); - } - return requiresReplication; + if (remainingTasks.decrementAndGet() == 0) { + if (error.get() != null) { + future.completeExceptionally(error.get()); + } else { + future.complete(skippedEdits.get()); + } + } + }); } } - static class RegionReplicaSinkWriter extends SinkWriter { - RegionReplicaOutputSink sink; - ClusterConnection connection; - RpcControllerFactory rpcControllerFactory; - RpcRetryingCallerFactory rpcRetryingCallerFactory; - int operationTimeout; - ExecutorService pool; - Cache<TableName, Boolean> disabledAndDroppedTables; - TableDescriptors tableDescriptors; - - public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection, - ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) { - this.sink = sink; - this.connection = connection; - this.operationTimeout = operationTimeout; - this.rpcRetryingCallerFactory - = RpcRetryingCallerFactory.instantiate(connection.getConfiguration()); - this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration()); - this.pool = pool; - this.tableDescriptors = tableDescriptors; - - int nonExistentTableCacheExpiryMs = connection.getConfiguration() - .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000); - // A cache for non existing tables that have a default expiry of 5 sec. This means that if the - // table is created again with the same name, we might miss to replicate for that amount of - // time. But this cache prevents overloading meta requests for every edit from a deleted file. - disabledAndDroppedTables = CacheBuilder.newBuilder() - .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS) - .initialCapacity(10) - .maximumSize(1000) - .build(); + private void logSkipped(TableName tableName, List<Entry> entries, String reason) { + if (LOG.isTraceEnabled()) { + LOG.trace("Skipping {} entries because table {} is {}", entries.size(), tableName, reason); + for (Entry entry : entries) { + LOG.trace("Skipping : {}", entry); + } } + } - public void append(TableName tableName, byte[] encodedRegionName, byte[] row, - List<Entry> entries) throws IOException { - - if (disabledAndDroppedTables.getIfPresent(tableName) != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("Skipping " + entries.size() + " entries because table " + tableName - + " is cached as a disabled or dropped table"); - for (Entry entry : entries) { - LOG.trace("Skipping : " + entry); - } - } - sink.getSkippedEditsCounter().addAndGet(entries.size()); - return; + private CompletableFuture<Long> replicate(TableDescriptor tableDesc, byte[] encodedRegionName, + List<Entry> entries) { + if (disabledTableCache.getIfPresent(tableDesc.getTableName()) != null) { + logSkipped(tableDesc.getTableName(), entries, "cached as a disabled table"); + return CompletableFuture.completedFuture(Long.valueOf(entries.size())); + } + byte[] row = CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)); + CompletableFuture<RegionLocations> locateFuture = new CompletableFuture<>(); + getRegionLocations(locateFuture, tableDesc, encodedRegionName, row, false); + CompletableFuture<Long> future = new CompletableFuture<>(); + FutureUtils.addListener(locateFuture, (locs, error) -> { + if (error != null) { + future.completeExceptionally(error); + } else { + replicate(future, locs, tableDesc, encodedRegionName, row, entries); } + }); + return future; + } - // If the table is disabled or dropped, we should not replay the entries, and we can skip - // replaying them. However, we might not know whether the table is disabled until we - // invalidate the cache and check from meta - RegionLocations locations = null; - boolean useCache = true; - while (true) { - // get the replicas of the primary region + @Override + public boolean replicate(ReplicateContext replicateContext) { + Map<byte[], Pair<TableDescriptor, List<Entry>>> encodedRegionName2Entries = + new TreeMap<>(Bytes.BYTES_COMPARATOR); + long skippedEdits = 0; + RetryCounter retryCounter = retryCounterFactory.create(); + outer: while (isRunning()) { + encodedRegionName2Entries.clear(); + skippedEdits = 0; + for (Entry entry : replicateContext.getEntries()) { + Optional<TableDescriptor> tableDesc; try { - locations = RegionReplicaReplayCallable - .getRegionLocations(connection, tableName, row, useCache, 0); - - if (locations == null) { - throw new HBaseIOException("Cannot locate locations for " - + tableName + ", row:" + Bytes.toStringBinary(row)); + tableDesc = tableDescriptorCache.get(entry.getKey().getTableName()); + } catch (ExecutionException e) { + LOG.warn("Failed to load table descriptor for {}, attempts={}", + entry.getKey().getTableName(), retryCounter.getAttemptTimes(), e.getCause()); + if (!retryCounter.shouldRetry()) { + return false; } - } catch (TableNotFoundException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Skipping " + entries.size() + " entries because table " + tableName - + " is dropped. Adding table to cache."); - for (Entry entry : entries) { - LOG.trace("Skipping : " + entry); - } + try { + retryCounter.sleepUntilNextRetry(); + } catch (InterruptedException e1) { + // restore the interrupted state + Thread.currentThread().interrupt(); + return false; } - disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored - // skip this entry - sink.getSkippedEditsCounter().addAndGet(entries.size()); - return; + continue outer; } - - // check whether we should still replay this entry. If the regions are changed, or the - // entry is not coming from the primary region, filter it out. - HRegionLocation primaryLocation = locations.getDefaultRegionLocation(); - if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(), - encodedRegionName)) { - if (useCache) { - useCache = false; - continue; // this will retry location lookup - } - if (LOG.isTraceEnabled()) { - LOG.trace("Skipping " + entries.size() + " entries in table " + tableName - + " because located region " + primaryLocation.getRegionInfo().getEncodedName() - + " is different than the original region " + Bytes.toStringBinary(encodedRegionName) - + " from WALEdit"); - for (Entry entry : entries) { - LOG.trace("Skipping : " + entry); - } - } - sink.getSkippedEditsCounter().addAndGet(entries.size()); - return; + if (!requiresReplication(tableDesc, entry)) { + skippedEdits++; + continue; } - break; + byte[] encodedRegionName = entry.getKey().getEncodedRegionName(); + encodedRegionName2Entries + .computeIfAbsent(encodedRegionName, k -> Pair.newPair(tableDesc.get(), new ArrayList<>())) + .getSecond().add(entry); } - - if (locations.size() == 1) { - return; - } - - ArrayList<Future<ReplicateWALEntryResponse>> tasks = new ArrayList<>(locations.size() - 1); - - // All passed entries should belong to one region because it is coming from the EntryBuffers - // split per region. But the regions might split and merge (unlike log recovery case). - for (int replicaId = 0; replicaId < locations.size(); replicaId++) { - HRegionLocation location = locations.getRegionLocation(replicaId); - if (!RegionReplicaUtil.isDefaultReplica(replicaId)) { - RegionInfo regionInfo = location == null - ? RegionReplicaUtil.getRegionInfoForReplica( - locations.getDefaultRegionLocation().getRegionInfo(), replicaId) - : location.getRegionInfo(); - RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection, - rpcControllerFactory, tableName, location, regionInfo, row, entries, - sink.getSkippedEditsCounter()); - Future<ReplicateWALEntryResponse> task = pool.submit( - new RetryingRpcCallable<>(rpcRetryingCallerFactory, callable, operationTimeout)); - tasks.add(task); - } + break; + } + // send the request to regions + retryCounter = retryCounterFactory.create(); + while (isRunning()) { + List<Pair<CompletableFuture<Long>, byte[]>> futureAndEncodedRegionNameList = + new ArrayList<Pair<CompletableFuture<Long>, byte[]>>(); + for (Map.Entry<byte[], Pair<TableDescriptor, List<Entry>>> entry : encodedRegionName2Entries + .entrySet()) { + CompletableFuture<Long> future = + replicate(entry.getValue().getFirst(), entry.getKey(), entry.getValue().getSecond()); + futureAndEncodedRegionNameList.add(Pair.newPair(future, entry.getKey())); } - - boolean tasksCancelled = false; - for (int replicaId = 0; replicaId < tasks.size(); replicaId++) { + for (Pair<CompletableFuture<Long>, byte[]> pair : futureAndEncodedRegionNameList) { + byte[] encodedRegionName = pair.getSecond(); try { - tasks.get(replicaId).get(); + skippedEdits += pair.getFirst().get().longValue(); + encodedRegionName2Entries.remove(encodedRegionName); } catch (InterruptedException e) { - throw new InterruptedIOException(e.getMessage()); + // restore the interrupted state + Thread.currentThread().interrupt(); + return false; } catch (ExecutionException e) { + Pair<TableDescriptor, List<Entry>> tableAndEntries = + encodedRegionName2Entries.get(encodedRegionName); + TableName tableName = tableAndEntries.getFirst().getTableName(); + List<Entry> entries = tableAndEntries.getSecond(); Throwable cause = e.getCause(); - boolean canBeSkipped = false; - if (cause instanceof IOException) { - // The table can be disabled or dropped at this time. For disabled tables, we have no - // cheap mechanism to detect this case because meta does not contain this information. - // ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay - // RPC. So instead we start the replay RPC with retries and check whether the table is - // dropped or disabled which might cause SocketTimeoutException, or - // RetriesExhaustedException or similar if we get IOE. - if (cause instanceof TableNotFoundException - || connection.isTableDisabled(tableName)) { - disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later. - canBeSkipped = true; - } else if (tableDescriptors != null) { - TableDescriptor tableDescriptor = tableDescriptors.get(tableName); - if (tableDescriptor != null - //(replicaId + 1) as no task is added for primary replica for replication - && tableDescriptor.getRegionReplication() <= (replicaId + 1)) { - canBeSkipped = true; - } - } - if (canBeSkipped) { - if (LOG.isTraceEnabled()) { - LOG.trace("Skipping " + entries.size() + " entries in table " + tableName - + " because received exception for dropped or disabled table", - cause); - for (Entry entry : entries) { - LOG.trace("Skipping : " + entry); - } - } - if (!tasksCancelled) { - sink.getSkippedEditsCounter().addAndGet(entries.size()); - tasksCancelled = true; // so that we do not add to skipped counter again - } - continue; - } - - // otherwise rethrow - throw (IOException)cause; + // The table can be disabled or dropped at this time. For disabled tables, we have no + // cheap mechanism to detect this case because meta does not contain this information. + // ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay + // RPC. So instead we start the replay RPC with retries and check whether the table is + // dropped or disabled which might cause SocketTimeoutException, or + // RetriesExhaustedException or similar if we get IOE. + if (cause instanceof TableNotFoundException) { + // add to cache that the table does not exist + tableDescriptorCache.put(tableName, Optional.empty()); + logSkipped(tableName, entries, "dropped"); + skippedEdits += entries.size(); + encodedRegionName2Entries.remove(encodedRegionName); + continue; + } + boolean disabled = false; + try { + disabled = connection.getAdmin().isTableDisabled(tableName).get(); + } catch (InterruptedException e1) { + // restore the interrupted state + Thread.currentThread().interrupt(); + return false; + } catch (ExecutionException e1) { + LOG.warn("Failed to test whether {} is disabled, assume it is not disabled", tableName, + e1.getCause()); + } + if (disabled) { + disabledTableCache.put(tableName, tableName); + logSkipped(tableName, entries, "disabled"); + skippedEdits += entries.size(); + encodedRegionName2Entries.remove(encodedRegionName); + continue; } - // unexpected exception - throw new IOException(cause); + LOG.warn("Failed to replicate {} entries for region {} of table {}", entries.size(), + Bytes.toStringBinary(encodedRegionName), tableName); + } + } + // we have done + if (encodedRegionName2Entries.isEmpty()) { + ctx.getMetrics().incrLogEditsFiltered(skippedEdits); + return true; + } else { + LOG.warn("Failed to replicate all entris, retry={}", retryCounter.getAttemptTimes()); + if (!retryCounter.shouldRetry()) { + return false; + } + try { + retryCounter.sleepUntilNextRetry(); + } catch (InterruptedException e) { + // restore the interrupted state + Thread.currentThread().interrupt(); + return false; } } } - } - static class RetryingRpcCallable<V> implements Callable<V> { - RpcRetryingCallerFactory factory; - RetryingCallable<V> callable; - int timeout; - public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable, - int timeout) { - this.factory = factory; - this.callable = callable; - this.timeout = timeout; - } - @Override - public V call() throws Exception { - return factory.<V>newCaller().callWithRetries(callable, timeout); - } + return false; } - /** - * Calls replay on the passed edits for the given set of entries belonging to the region. It skips - * the entry if the region boundaries have changed or the region is gone. - */ - static class RegionReplicaReplayCallable extends - RegionAdminServiceCallable<ReplicateWALEntryResponse> { - private final List<Entry> entries; - private final byte[] initialEncodedRegionName; - private final AtomicLong skippedEntries; - - public RegionReplicaReplayCallable(ClusterConnection connection, - RpcControllerFactory rpcControllerFactory, TableName tableName, - HRegionLocation location, RegionInfo regionInfo, byte[] row,List<Entry> entries, - AtomicLong skippedEntries) { - super(connection, rpcControllerFactory, location, tableName, row, regionInfo.getReplicaId()); - this.entries = entries; - this.skippedEntries = skippedEntries; - this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes(); - } - - @Override - public ReplicateWALEntryResponse call(HBaseRpcController controller) throws Exception { - // Check whether we should still replay this entry. If the regions are changed, or the - // entry is not coming form the primary region, filter it out because we do not need it. - // Regions can change because of (1) region split (2) region merge (3) table recreated - boolean skip = false; - if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(), - initialEncodedRegionName)) { - skip = true; - } - if (!this.entries.isEmpty() && !skip) { - Entry[] entriesArray = new Entry[this.entries.size()]; - entriesArray = this.entries.toArray(entriesArray); - - // set the region name for the target region replica - Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = - ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location - .getRegionInfo().getEncodedNameAsBytes(), null, null, null); - controller.setCellScanner(p.getSecond()); - return stub.replay(controller, p.getFirst()); - } + @Override + public boolean canReplicateToSameCluster() { + return true; + } - if (skip) { - if (LOG.isTraceEnabled()) { - LOG.trace("Skipping " + entries.size() + " entries in table " + tableName - + " because located region " + location.getRegionInfo().getEncodedName() - + " is different than the original region " - + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit"); - for (Entry entry : entries) { - LOG.trace("Skipping : " + entry); - } - } - skippedEntries.addAndGet(entries.size()); - } - return ReplicateWALEntryResponse.newBuilder().build(); - } + @Override + protected WALEntryFilter getScopeWALEntryFilter() { + // we do not care about scope. We replicate everything. + return null; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index f1b6e76..c453e21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -282,7 +282,7 @@ public class ReplicationSource implements ReplicationSourceInterface { tableDescriptors = ((HRegionServer) server).getTableDescriptors(); } replicationEndpoint - .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, + .init(new ReplicationEndpoint.Context(server, conf, replicationPeer.getConfiguration(), fs, replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server)); replicationEndpoint.start(); replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java index e1e55f5..9af60c5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java @@ -88,7 +88,7 @@ public class TestAsyncTableNoncedRetry { registry.getClusterId().get(), null, User.getCurrent()) { @Override - public NonceGenerator getNonceGenerator() { + NonceGenerator getNonceGenerator() { return NONCE_GENERATOR; } }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java index 04db81a..017d7c9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java @@ -20,16 +20,17 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicLong; +import java.util.UUID; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell.Type; import org.apache.hadoop.hbase.CellBuilderFactory; @@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionLocator; @@ -51,12 +51,12 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; @@ -383,9 +383,8 @@ public class TestRegionReplicaReplicationEndpoint { testRegionReplicaReplicationIgnores(false, true); } - public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication) + private void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication) throws Exception { - // tests having edits from a disabled or dropped table is handled correctly by skipping those // entries and further edits after the edits from dropped/disabled table can be replicated // without problems. @@ -405,8 +404,7 @@ public class TestRegionReplicaReplicationEndpoint { HTU.getAdmin().createTable(htd); // both tables are created, now pause replication - ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); - admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); + HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId()); // now that the replication is disabled, write to the table to be dropped, then drop the table. @@ -416,19 +414,9 @@ public class TestRegionReplicaReplicationEndpoint { HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000); - AtomicLong skippedEdits = new AtomicLong(); - RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink = - mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); - when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); - FSTableDescriptors fstd = new FSTableDescriptors(HTU.getConfiguration(), - FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath()); - RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter = - new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink, - (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE, - fstd); RegionLocator rl = connection.getRegionLocator(toBeDisabledTable); HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY); - byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); + byte[] encodedRegionName = hrl.getRegion().getEncodedNameAsBytes(); Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A")) .setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build(); @@ -436,7 +424,6 @@ public class TestRegionReplicaReplicationEndpoint { new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1), new WALEdit() .add(cell)); - HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table if (dropTable) { HTU.getAdmin().deleteTable(toBeDisabledTable); @@ -445,11 +432,23 @@ public class TestRegionReplicaReplicationEndpoint { HTU.getAdmin().modifyTable(toBeDisabledTable, htd); HTU.getAdmin().enableTable(toBeDisabledTable); } - sinkWriter.append(toBeDisabledTable, encodedRegionName, - HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry)); - - assertEquals(2, skippedEdits.get()); + HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0); + MetricsSource metrics = mock(MetricsSource.class); + ReplicationEndpoint.Context ctx = + new ReplicationEndpoint.Context(rs, HTU.getConfiguration(), HTU.getConfiguration(), + HTU.getTestFileSystem(), ServerRegionReplicaUtil.getReplicationPeerId(), + UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService().getReplicationPeers() + .getPeer(ServerRegionReplicaUtil.getReplicationPeerId()), + metrics, rs.getTableDescriptors(), rs); + RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint(); + rrpe.init(ctx); + rrpe.start(); + ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext(); + repCtx.setEntries(Lists.newArrayList(entry, entry)); + assertTrue(rrpe.replicate(repCtx)); + verify(metrics, times(1)).incrLogEditsFiltered(eq(2L)); + rrpe.stop(); if (disableReplication) { // enable replication again so that we can verify replication HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table @@ -460,17 +459,14 @@ public class TestRegionReplicaReplicationEndpoint { try { // load some data to the to-be-dropped table - // load the data to the table HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); // now enable the replication - admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); + HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId()); verifyReplication(tableName, regionReplication, 0, 1000); - } finally { - admin.close(); table.close(); rl.close(); tableToBeDisabled.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java index ab67d94..de0f151 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java @@ -19,15 +19,16 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion; import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion; -import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.Collections; import java.util.Optional; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -37,24 +38,22 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.ClusterConnectionFactory; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.WALCoprocessor; import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.WALObserver; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext; -import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; @@ -73,8 +72,6 @@ import org.junit.experimental.categories.Category; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; - /** * Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this * class contains lower level tests using callables. @@ -178,39 +175,34 @@ public class TestRegionReplicaReplicationEndpointNoMaster { public void testReplayCallable() throws Exception { // tests replaying the edits to a secondary region replica using the Callable directly openRegion(HTU, rs0, hriSecondary); - ClusterConnection connection = - (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration()); - //load some data to primary + // load some data to primary HTU.loadNumericRows(table, f, 0, 1000); Assert.assertEquals(1000, entries.size()); - // replay the edits to the secondary using replay callable - replicateUsingCallable(connection, entries); + try (AsyncClusterConnection conn = ClusterConnectionFactory + .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) { + // replay the edits to the secondary using replay callable + replicateUsingCallable(conn, entries); + } Region region = rs0.getRegion(hriSecondary.getEncodedName()); HTU.verifyNumericRows(region, f, 0, 1000); HTU.deleteNumericRows(table, f, 0, 1000); closeRegion(HTU, rs0, hriSecondary); - connection.close(); } - private void replicateUsingCallable(ClusterConnection connection, Queue<Entry> entries) - throws IOException, RuntimeException { + private void replicateUsingCallable(AsyncClusterConnection connection, Queue<Entry> entries) + throws IOException, ExecutionException, InterruptedException { Entry entry; while ((entry = entries.poll()) != null) { byte[] row = CellUtil.cloneRow(entry.getEdit().getCells().get(0)); - RegionLocations locations = connection.locateRegion(tableName, row, true, true); - RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection, - RpcControllerFactory.instantiate(connection.getConfiguration()), - table.getName(), locations.getRegionLocation(1), - locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry), - new AtomicLong()); - - RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate( - connection.getConfiguration()); - factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000); + RegionLocations locations = connection.getRegionLocations(tableName, row, true).get(); + connection + .replay(tableName, locations.getRegionLocation(1).getRegion().getEncodedNameAsBytes(), row, + Collections.singletonList(entry), 1, Integer.MAX_VALUE, TimeUnit.SECONDS.toNanos(10)) + .get(); } } @@ -218,49 +210,49 @@ public class TestRegionReplicaReplicationEndpointNoMaster { public void testReplayCallableWithRegionMove() throws Exception { // tests replaying the edits to a secondary region replica using the Callable directly while // the region is moved to another location.It tests handling of RME. - openRegion(HTU, rs0, hriSecondary); - ClusterConnection connection = - (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration()); - //load some data to primary - HTU.loadNumericRows(table, f, 0, 1000); + try (AsyncClusterConnection conn = ClusterConnectionFactory + .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) { + openRegion(HTU, rs0, hriSecondary); + // load some data to primary + HTU.loadNumericRows(table, f, 0, 1000); - Assert.assertEquals(1000, entries.size()); - // replay the edits to the secondary using replay callable - replicateUsingCallable(connection, entries); + Assert.assertEquals(1000, entries.size()); - Region region = rs0.getRegion(hriSecondary.getEncodedName()); - HTU.verifyNumericRows(region, f, 0, 1000); + // replay the edits to the secondary using replay callable + replicateUsingCallable(conn, entries); - HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary + Region region = rs0.getRegion(hriSecondary.getEncodedName()); + HTU.verifyNumericRows(region, f, 0, 1000); - // move the secondary region from RS0 to RS1 - closeRegion(HTU, rs0, hriSecondary); - openRegion(HTU, rs1, hriSecondary); + HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary - // replicate the new data - replicateUsingCallable(connection, entries); + // move the secondary region from RS0 to RS1 + closeRegion(HTU, rs0, hriSecondary); + openRegion(HTU, rs1, hriSecondary); - region = rs1.getRegion(hriSecondary.getEncodedName()); - // verify the new data. old data may or may not be there - HTU.verifyNumericRows(region, f, 1000, 2000); + // replicate the new data + replicateUsingCallable(conn, entries); - HTU.deleteNumericRows(table, f, 0, 2000); - closeRegion(HTU, rs1, hriSecondary); - connection.close(); + region = rs1.getRegion(hriSecondary.getEncodedName()); + // verify the new data. old data may or may not be there + HTU.verifyNumericRows(region, f, 1000, 2000); + + HTU.deleteNumericRows(table, f, 0, 2000); + closeRegion(HTU, rs1, hriSecondary); + } } @Test public void testRegionReplicaReplicationEndpointReplicate() throws Exception { // tests replaying the edits to a secondary region replica using the RRRE.replicate() openRegion(HTU, rs0, hriSecondary); - ClusterConnection connection = - (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration()); RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint(); ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class); when(context.getConfiguration()).thenReturn(HTU.getConfiguration()); when(context.getMetrics()).thenReturn(mock(MetricsSource.class)); - + when(context.getServer()).thenReturn(rs0); + when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors()); replicator.init(context); replicator.startAsync(); @@ -272,12 +264,11 @@ public class TestRegionReplicaReplicationEndpointNoMaster { final String fakeWalGroupId = "fakeWALGroup"; replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries)) .setWalGroupId(fakeWalGroupId)); - + replicator.stop(); Region region = rs0.getRegion(hriSecondary.getEncodedName()); HTU.verifyNumericRows(region, f, 0, 1000); HTU.deleteNumericRows(table, f, 0, 1000); closeRegion(HTU, rs0, hriSecondary); - connection.close(); } }