HBASE-19082 Reject read/write from client but accept write from replication in state S
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6be7b9c4 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6be7b9c4 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6be7b9c4 Branch: refs/heads/HBASE-19064 Commit: 6be7b9c4c57807e483c549869b037b08e88281cc Parents: f6e00b4 Author: zhangduo <zhang...@apache.org> Authored: Mon Feb 12 18:20:18 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Wed Mar 7 18:12:44 2018 +0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/HConstants.java | 3 - .../src/main/protobuf/MasterProcedure.proto | 3 +- .../hbase/replication/ReplicationUtils.java | 4 + ...ransitPeerSyncReplicationStateProcedure.java | 10 + .../hadoop/hbase/regionserver/HRegion.java | 5 +- .../hbase/regionserver/HRegionServer.java | 2 +- .../hbase/regionserver/RSRpcServices.java | 88 ++++++-- .../RejectRequestsFromClientStateChecker.java | 44 ++++ .../regionserver/ReplicationSink.java | 72 ++++--- .../SyncReplicationPeerInfoProvider.java | 10 +- .../SyncReplicationPeerInfoProviderImpl.java | 19 +- .../hbase/wal/SyncReplicationWALProvider.java | 3 + .../org/apache/hadoop/hbase/wal/WALFactory.java | 4 +- .../hbase/replication/TestSyncReplication.java | 200 +++++++++++++++++++ .../wal/TestSyncReplicationWALProvider.java | 8 +- 15 files changed, 401 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6be7b9c4/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index c7c4665..3dd0ac8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1330,9 +1330,6 @@ public final class HConstants { public static final String NOT_IMPLEMENTED = "Not implemented"; - // TODO: need to find a better place to hold it. - public static final String SYNC_REPLICATION_ENABLED = "hbase.replication.sync.enabled"; - private HConstants() { // Can't be instantiated with this ctor. } http://git-wip-us.apache.org/repos/asf/hbase/blob/6be7b9c4/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index c361389..fe08be5 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -385,7 +385,8 @@ enum PeerSyncReplicationStateTransitionState { REOPEN_ALL_REGIONS_IN_PEER = 5; TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 6; REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 7; - POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 8; + CREATE_DIR_FOR_REMOTE_WAL = 8; + POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 9; } message PeerModificationStateData { http://git-wip-us.apache.org/repos/asf/hbase/blob/6be7b9c4/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index d633be9..9d0c05d 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -37,6 +37,10 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public final class ReplicationUtils { + public static final String SYNC_REPLICATION_ENABLED = "hbase.replication.sync.enabled"; + + public static final String REPLICATION_ATTR_NAME = "__rep__"; + private ReplicationUtils() { } http://git-wip-us.apache.org/repos/asf/hbase/blob/6be7b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index c253bff..e53abc0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -191,8 +191,18 @@ public class TransitPeerSyncReplicationStateProcedure addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1)) .toArray(RefreshPeerProcedure[]::new)); + if (toState == SyncReplicationState.STANDBY) { + setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL); + } else { + setNextState( + PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); + } + return Flow.HAS_MORE_STATE; + case CREATE_DIR_FOR_REMOTE_WAL: + // TODO: create wal for write remote wal setNextState( PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION); + return Flow.HAS_MORE_STATE; case POST_PEER_SYNC_REPLICATION_STATE_TRANSITION: try { postTransit(env); http://git-wip-us.apache.org/repos/asf/hbase/blob/6be7b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f071baf..905a5d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4228,12 +4228,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * Add updates first to the wal and then add values to memstore. + * <p> * Warning: Assumption is caller has lock on passed in row. * @param edits Cell updates by column - * @throws IOException */ - void put(final byte [] row, byte [] family, List<Cell> edits) - throws IOException { + void put(final byte[] row, byte[] family, List<Cell> edits) throws IOException { NavigableMap<byte[], List<Cell>> familyMap; familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); http://git-wip-us.apache.org/repos/asf/hbase/blob/6be7b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index d617bc6..e5028b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1804,7 +1804,7 @@ public class HRegionServer extends HasThread implements boolean isMasterNoTableOrSystemTableOnly = this instanceof HMaster && (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf)); if (isMasterNoTableOrSystemTableOnly) { - conf.setBoolean(HConstants.SYNC_REPLICATION_ENABLED, false); + conf.setBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, false); } WALFactory factory = new WALFactory(conf, serverName.toString()); if (!isMasterNoTableOrSystemTableOnly) { http://git-wip-us.apache.org/repos/asf/hbase/blob/6be7b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 803d3e8..b8123e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -116,6 +116,8 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTrack import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; +import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.access.AccessChecker; @@ -2407,6 +2409,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return region.execService(execController, serviceCall); } + private boolean shouldRejectRequestsFromClient(HRegion region) { + return regionServer.getReplicationSourceService().getSyncReplicationPeerInfoProvider() + .checkState(region.getRegionInfo(), RejectRequestsFromClientStateChecker.get()); + } + + private void rejectIfInStandByState(HRegion region) throws DoNotRetryIOException { + if (shouldRejectRequestsFromClient(region)) { + throw new DoNotRetryIOException( + region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state."); + } + } + /** * Get data from a table. * @@ -2415,8 +2429,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @throws ServiceException */ @Override - public GetResponse get(final RpcController controller, - final GetRequest request) throws ServiceException { + public GetResponse get(final RpcController controller, final GetRequest request) + throws ServiceException { long before = EnvironmentEdgeManager.currentTime(); OperationQuota quota = null; HRegion region = null; @@ -2425,6 +2439,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, requestCount.increment(); rpcGetRequestCount.increment(); region = getRegion(request.getRegion()); + rejectIfInStandByState(region); GetResponse.Builder builder = GetResponse.newBuilder(); ClientProtos.Get get = request.getGet(); @@ -2554,16 +2569,45 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + private void failRegionAction(MultiResponse.Builder responseBuilder, + RegionActionResult.Builder regionActionResultBuilder, RegionAction regionAction, + CellScanner cellScanner, Throwable error) { + rpcServer.getMetrics().exception(error); + regionActionResultBuilder.setException(ResponseConverter.buildException(error)); + responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); + // All Mutations in this RegionAction not executed as we can not see the Region online here + // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner + // corresponding to these Mutations. + if (cellScanner != null) { + skipCellsForMutations(regionAction.getActionList(), cellScanner); + } + } + + private boolean isReplicationRequest(Action action) { + // replication request can only be put or delete. + if (!action.hasMutation()) { + return false; + } + MutationProto mutation = action.getMutation(); + MutationType type = mutation.getMutateType(); + if (type != MutationType.PUT && type != MutationType.DELETE) { + return false; + } + // replication will set a special attribute so we can make use of it to decide whether a request + // is for replication. + return mutation.getAttributeList().stream().map(p -> p.getName()) + .filter(n -> n.equals(ReplicationUtils.REPLICATION_ATTR_NAME)).findAny().isPresent(); + } + /** * Execute multiple actions on a table: get, mutate, and/or execCoprocessor - * * @param rpcc the RPC controller * @param request the multi request * @throws ServiceException */ @Override public MultiResponse multi(final RpcController rpcc, final MultiRequest request) - throws ServiceException { + throws ServiceException { try { checkOpen(); } catch (IOException ie) { @@ -2603,17 +2647,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler, region = getRegion(regionSpecifier); quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList()); } catch (IOException e) { - rpcServer.getMetrics().exception(e); - regionActionResultBuilder.setException(ResponseConverter.buildException(e)); - responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); - // All Mutations in this RegionAction not executed as we can not see the Region online here - // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner - // corresponding to these Mutations. - skipCellsForMutations(regionAction.getActionList(), cellScanner); + failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); continue; // For this region it's a failure. } - + boolean rejectIfFromClient = shouldRejectRequestsFromClient(region); if (regionAction.hasAtomic() && regionAction.getAtomic()) { + // We only allow replication in standby state and it will not set the atomic flag. + if (rejectIfFromClient) { + failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, + new DoNotRetryIOException( + region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); + quota.close(); + continue; + } // How does this call happen? It may need some work to play well w/ the surroundings. // Need to return an item per Action along w/ Action index. TODO. try { @@ -2640,6 +2686,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler, regionActionResultBuilder.setException(ResponseConverter.buildException(e)); } } else { + if (rejectIfFromClient && regionAction.getActionCount() > 0 && + !isReplicationRequest(regionAction.getAction(0))) { + // fail if it is not a replication request + failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, + new DoNotRetryIOException( + region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state")); + quota.close(); + continue; + } // doNonAtomicRegionMutation manages the exception internally if (context != null && closeCallBack == null) { // An RpcCallBack that creates a list of scanners that needs to perform callBack @@ -2655,7 +2710,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); quota.close(); ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics(); - if(regionLoadStats != null) { + if (regionLoadStats != null) { regionStats.put(regionSpecifier, regionLoadStats); } } @@ -2714,8 +2769,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param request the mutate request */ @Override - public MutateResponse mutate(final RpcController rpcc, - final MutateRequest request) throws ServiceException { + public MutateResponse mutate(final RpcController rpcc, final MutateRequest request) + throws ServiceException { // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. // It is also the conduit via which we pass back data. HBaseRpcController controller = (HBaseRpcController)rpcc; @@ -2735,6 +2790,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, requestCount.increment(); rpcMutateRequestCount.increment(); region = getRegion(request.getRegion()); + rejectIfInStandByState(region); MutateResponse.Builder builder = MutateResponse.newBuilder(); MutationProto mutation = request.getMutation(); if (!region.getRegionInfo().isMetaRegion()) { @@ -2899,6 +2955,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, "'hbase.client.scanner.timeout.period' configuration."); } } + rejectIfInStandByState(rsh.r); RegionInfo hri = rsh.s.getRegionInfo(); // Yes, should be the same instance if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) { @@ -2925,6 +2982,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder) throws IOException { HRegion region = getRegion(request.getRegion()); + rejectIfInStandByState(region); ClientProtos.Scan protoScan = request.getScan(); boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand(); Scan scan = ProtobufUtil.toScan(protoScan); http://git-wip-us.apache.org/repos/asf/hbase/blob/6be7b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectRequestsFromClientStateChecker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectRequestsFromClientStateChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectRequestsFromClientStateChecker.java new file mode 100644 index 0000000..8e68f0f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RejectRequestsFromClientStateChecker.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.function.BiPredicate; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Check whether we need to reject the request from client. + */ +@InterfaceAudience.Private +public class RejectRequestsFromClientStateChecker + implements BiPredicate<SyncReplicationState, SyncReplicationState> { + + private static final RejectRequestsFromClientStateChecker INST = + new RejectRequestsFromClientStateChecker(); + + @Override + public boolean test(SyncReplicationState state, SyncReplicationState newState) { + // reject requests from client if we are in standby state, or we are going to transit to standby + // state. + return state == SyncReplicationState.STANDBY || newState == SyncReplicationState.STANDBY; + } + + public static RejectRequestsFromClientStateChecker get() { + return INST; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6be7b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 5a05660..ae072d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -29,7 +28,6 @@ import java.util.Map.Entry; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -41,9 +39,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; @@ -52,13 +47,18 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; /** * <p> @@ -82,10 +82,10 @@ public class ReplicationSink { private final Configuration conf; // Volatile because of note in here -- look for double-checked locking: // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html - private volatile Connection sharedHtableCon; + private volatile Connection sharedConn; private final MetricsSink metrics; private final AtomicLong totalReplicatedEdits = new AtomicLong(); - private final Object sharedHtableConLock = new Object(); + private final Object sharedConnLock = new Object(); // Number of hfiles that we successfully replicated private long hfilesReplicated = 0; private SourceFSConfigurationProvider provider; @@ -108,12 +108,12 @@ public class ReplicationSink { conf.get("hbase.replication.source.fs.conf.provider", DefaultSourceFSConfigurationProvider.class.getCanonicalName()); try { - @SuppressWarnings("rawtypes") - Class c = Class.forName(className); - this.provider = (SourceFSConfigurationProvider) c.getDeclaredConstructor().newInstance(); + Class<? extends SourceFSConfigurationProvider> c = + Class.forName(className).asSubclass(SourceFSConfigurationProvider.class); + this.provider = c.getDeclaredConstructor().newInstance(); } catch (Exception e) { - throw new IllegalArgumentException("Configured source fs configuration provider class " - + className + " throws error.", e); + throw new IllegalArgumentException( + "Configured source fs configuration provider class " + className + " throws error.", e); } } @@ -217,6 +217,8 @@ public class ReplicationSink { clusterIds.add(toUUID(clusterId)); } mutation.setClusterIds(clusterIds); + mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME, + HConstants.EMPTY_BYTE_ARRAY); addToHashMultiMap(rowMap, table, clusterIds, mutation); } if (CellUtil.isDelete(cell)) { @@ -370,11 +372,11 @@ public class ReplicationSink { */ public void stopReplicationSinkServices() { try { - if (this.sharedHtableCon != null) { - synchronized (sharedHtableConLock) { - if (this.sharedHtableCon != null) { - this.sharedHtableCon.close(); - this.sharedHtableCon = null; + if (this.sharedConn != null) { + synchronized (sharedConnLock) { + if (this.sharedConn != null) { + this.sharedConn.close(); + this.sharedConn = null; } } } @@ -390,14 +392,12 @@ public class ReplicationSink { * @param allRows list of actions * @throws IOException */ - protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException { + private void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException { if (allRows.isEmpty()) { return; } - Table table = null; - try { - Connection connection = getConnection(); - table = connection.getTable(tableName); + Connection connection = getConnection(); + try (Table table = connection.getTable(tableName)) { for (List<Row> rows : allRows) { table.batch(rows, null); } @@ -410,21 +410,18 @@ public class ReplicationSink { throw rewde; } catch (InterruptedException ix) { throw (InterruptedIOException) new InterruptedIOException().initCause(ix); - } finally { - if (table != null) { - table.close(); - } } } private Connection getConnection() throws IOException { // See https://en.wikipedia.org/wiki/Double-checked_locking - Connection connection = sharedHtableCon; + Connection connection = sharedConn; if (connection == null) { - synchronized (sharedHtableConLock) { - connection = sharedHtableCon; + synchronized (sharedConnLock) { + connection = sharedConn; if (connection == null) { - connection = sharedHtableCon = ConnectionFactory.createConnection(conf); + connection = ConnectionFactory.createConnection(conf); + sharedConn = connection; } } } @@ -437,9 +434,10 @@ public class ReplicationSink { * of the last edit that was applied */ public String getStats() { - return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " + - "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() + - ", total replicated edits: " + this.totalReplicatedEdits; + long total = this.totalReplicatedEdits.get(); + return total == 0 ? "" + : "Sink: " + "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() + + ", total replicated edits: " + total; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/6be7b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java index 92f2c52..66fe3be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.Optional; +import java.util.function.BiPredicate; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Pair; @@ -36,8 +37,11 @@ public interface SyncReplicationPeerInfoProvider { Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info); /** - * Check whether the give region is contained in a sync replication peer which is in the given - * state. + * Check whether the give region is contained in a sync replication peer which can pass the state + * checker. + * <p> + * Will call the checker with current sync replication state and new sync replication state. */ - boolean isInState(RegionInfo info, SyncReplicationState state); + boolean checkState(RegionInfo info, + BiPredicate<SyncReplicationState, SyncReplicationState> checker); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6be7b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java index 32159e6..973e049 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java @@ -18,8 +18,9 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.Optional; +import java.util.function.BiPredicate; import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.replication.ReplicationPeer; +import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Pair; @@ -44,11 +45,14 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv if (peerId == null) { return Optional.empty(); } - ReplicationPeer peer = replicationPeers.getPeer(peerId); + ReplicationPeerImpl peer = replicationPeers.getPeer(peerId); if (peer == null) { return Optional.empty(); } - if (peer.getSyncReplicationState() == SyncReplicationState.ACTIVE) { + Pair<SyncReplicationState, SyncReplicationState> states = + peer.getSyncReplicationStateAndNewState(); + if (states.getFirst() == SyncReplicationState.ACTIVE && + states.getSecond() == SyncReplicationState.NONE) { return Optional.of(Pair.newPair(peerId, peer.getPeerConfig().getRemoteWALDir())); } else { return Optional.empty(); @@ -56,16 +60,19 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv } @Override - public boolean isInState(RegionInfo info, SyncReplicationState state) { + public boolean checkState(RegionInfo info, + BiPredicate<SyncReplicationState, SyncReplicationState> checker) { String peerId = mapping.getPeerId(info); if (peerId == null) { return false; } - ReplicationPeer peer = replicationPeers.getPeer(peerId); + ReplicationPeerImpl peer = replicationPeers.getPeer(peerId); if (peer == null) { return false; } - return peer.getSyncReplicationState() == state; + Pair<SyncReplicationState, SyncReplicationState> states = + peer.getSyncReplicationStateAndNewState(); + return checker.test(states.getFirst(), states.getSecond()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6be7b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index e3de6b4..ac4b4cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -141,6 +141,9 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen @Override public WAL getWAL(RegionInfo region) throws IOException { + if (region == null) { + return provider.getWAL(region); + } Optional<Pair<String, String>> peerIdAndRemoteWALDir = peerInfoProvider.getPeerIdAndRemoteWALDir(region); if (peerIdAndRemoteWALDir.isPresent()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/6be7b9c4/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 202b584..1b8f52e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -24,10 +24,10 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; @@ -160,7 +160,7 @@ public class WALFactory { // end required early initialization if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) { WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); - if (conf.getBoolean(HConstants.SYNC_REPLICATION_ENABLED, false)) { + if (conf.getBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, false)) { provider = new SyncReplicationWALProvider(provider); } provider.init(this, conf, null); http://git-wip-us.apache.org/repos/asf/hbase/blob/6be7b9c4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java new file mode 100644 index 0000000..acddc4a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSyncReplication { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplication.class); + + private static final HBaseZKTestingUtility ZK_UTIL = new HBaseZKTestingUtility(); + + private static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility(); + + private static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("SyncRep"); + + private static byte[] CF = Bytes.toBytes("cf"); + + private static byte[] CQ = Bytes.toBytes("cq"); + + private static String PEER_ID = "1"; + + private static void initTestingUtility(HBaseTestingUtility util, String zkParent) { + util.setZkCluster(ZK_UTIL.getZkCluster()); + Configuration conf = util.getConfiguration(); + conf.setBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, true); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent); + conf.setInt("replication.source.size.capacity", 102400); + conf.setLong("replication.source.sleepforretries", 100); + conf.setInt("hbase.regionserver.maxlogs", 10); + conf.setLong("hbase.master.logcleaner.ttl", 10); + conf.setInt("zookeeper.recovery.retry", 1); + conf.setInt("zookeeper.recovery.retry.intervalmill", 10); + conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf.setInt("replication.stats.thread.period.seconds", 5); + conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); + conf.setLong("replication.sleep.before.failover", 2000); + conf.setInt("replication.source.maxretriesmultiplier", 10); + conf.setFloat("replication.source.ratio", 1.0f); + conf.setBoolean("replication.source.eof.autorecovery", true); + } + + @BeforeClass + public static void setUp() throws Exception { + ZK_UTIL.startMiniZKCluster(); + initTestingUtility(UTIL1, "/cluster1"); + initTestingUtility(UTIL2, "/cluster2"); + UTIL1.startMiniCluster(3); + UTIL2.startMiniCluster(3); + TableDescriptor td = + TableDescriptorBuilder.newBuilder(TABLE_NAME).addColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); + UTIL1.getAdmin().createTable(td); + UTIL2.getAdmin().createTable(td); + FileSystem fs1 = UTIL1.getTestFileSystem(); + FileSystem fs2 = UTIL2.getTestFileSystem(); + Path remoteWALDir1 = + new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(), + "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory()); + Path remoteWALDir2 = + new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(), + "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory()); + UTIL1.getAdmin().addReplicationPeer(PEER_ID, + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()) + .setReplicateAllUserTables(false) + .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) + .setRemoteWALDir(remoteWALDir2.toUri().toString()).build()); + UTIL2.getAdmin().addReplicationPeer(PEER_ID, + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey()) + .setReplicateAllUserTables(false) + .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) + .setRemoteWALDir(remoteWALDir1.toUri().toString()).build()); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL1.shutdownMiniCluster(); + UTIL2.shutdownMiniCluster(); + ZK_UTIL.shutdownMiniZKCluster(); + } + + @FunctionalInterface + private interface TableAction { + + void call(Table table) throws IOException; + } + + private void assertDisallow(Table table, TableAction action) throws IOException { + try { + action.call(table); + } catch (DoNotRetryIOException | RetriesExhaustedException e) { + // expected + assertThat(e.getMessage(), containsString("STANDBY")); + } + } + + @Test + public void testStandby() throws Exception { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) { + assertDisallow(table, t -> t.get(new Get(Bytes.toBytes("row")))); + assertDisallow(table, + t -> t.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))); + assertDisallow(table, t -> t.delete(new Delete(Bytes.toBytes("row")))); + assertDisallow(table, t -> t.incrementColumnValue(Bytes.toBytes("row"), CF, CQ, 1)); + assertDisallow(table, + t -> t.append(new Append(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))); + assertDisallow(table, + t -> t.get(Arrays.asList(new Get(Bytes.toBytes("row")), new Get(Bytes.toBytes("row1"))))); + assertDisallow(table, + t -> t + .put(Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")), + new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, Bytes.toBytes("row1"))))); + assertDisallow(table, t -> t.mutateRow(new RowMutations(Bytes.toBytes("row")) + .add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))))); + } + // But we should still allow replication writes + try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + // The reject check is in RSRpcService so we can still read through HRegion + HRegion region = UTIL2.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); + UTIL2.waitFor(30000, new ExplainingPredicate<Exception>() { + + @Override + public boolean evaluate() throws Exception { + return !region.get(new Get(Bytes.toBytes(99))).isEmpty(); + } + + @Override + public String explainFailure() throws Exception { + return "Replication has not been catched up yet"; + } + }); + for (int i = 0; i < 100; i++) { + assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ))); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6be7b9c4/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java index 986228c..488d9fb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java @@ -24,10 +24,10 @@ import static org.junit.Assert.assertThat; import java.io.IOException; import java.util.Optional; +import java.util.function.BiPredicate; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.RegionInfo; @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -84,7 +85,8 @@ public class TestSyncReplicationWALProvider { } @Override - public boolean isInState(RegionInfo info, SyncReplicationState state) { + public boolean checkState(RegionInfo info, + BiPredicate<SyncReplicationState, SyncReplicationState> checker) { // TODO Implement SyncReplicationPeerInfoProvider.isInState return false; } @@ -92,7 +94,7 @@ public class TestSyncReplicationWALProvider { @BeforeClass public static void setUpBeforeClass() throws Exception { - UTIL.getConfiguration().setBoolean(HConstants.SYNC_REPLICATION_ENABLED, true); + UTIL.getConfiguration().setBoolean(ReplicationUtils.SYNC_REPLICATION_ENABLED, true); UTIL.startMiniDFSCluster(3); FACTORY = new WALFactory(UTIL.getConfiguration(), "test"); ((SyncReplicationWALProvider) FACTORY.getWALProvider()).setPeerInfoProvider(new InfoProvider());