HBASE-19079 Support setting up two clusters with A and S stat
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f7a7578b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f7a7578b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f7a7578b Branch: refs/heads/HBASE-19064 Commit: f7a7578b785c81b9fba464b3263506416a025b0f Parents: 4f3ad07 Author: zhangduo <zhang...@apache.org> Authored: Tue Apr 10 22:35:19 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri May 25 10:11:48 2018 +0800 ---------------------------------------------------------------------- .../replication/ReplicationPeerManager.java | 5 +- ...ransitPeerSyncReplicationStateProcedure.java | 2 +- .../hbase/regionserver/wal/DualAsyncFSWAL.java | 14 ++ .../hadoop/hbase/regionserver/wal/WALUtil.java | 25 ++- .../hbase/replication/ChainWALEntryFilter.java | 28 +-- .../ReplaySyncReplicationWALCallable.java | 27 ++- .../SyncReplicationPeerInfoProviderImpl.java | 6 +- .../hadoop/hbase/wal/AbstractFSWALProvider.java | 10 +- .../hbase/wal/SyncReplicationWALProvider.java | 94 ++++++--- .../org/apache/hadoop/hbase/wal/WALEdit.java | 8 +- .../org/apache/hadoop/hbase/wal/WALFactory.java | 2 +- .../replication/TestReplicationAdmin.java | 33 +-- .../regionserver/wal/TestWALDurability.java | 2 + .../replication/SyncReplicationTestBase.java | 185 +++++++++++++++++ .../hbase/replication/TestSyncReplication.java | 207 ------------------- .../replication/TestSyncReplicationActive.java | 64 ++++++ .../replication/TestSyncReplicationStandBy.java | 96 +++++++++ 17 files changed, 521 insertions(+), 287 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 41dd6e3..229549e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -68,8 +68,9 @@ public class ReplicationPeerManager { private final ImmutableMap<SyncReplicationState, EnumSet<SyncReplicationState>> allowedTransition = Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE, - EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), SyncReplicationState.STANDBY, - EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), SyncReplicationState.DOWNGRADE_ACTIVE, + EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE, SyncReplicationState.STANDBY), + SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), + SyncReplicationState.DOWNGRADE_ACTIVE, EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE))); ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage, http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index cc51890..5da2b0c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -171,7 +171,7 @@ public class TransitPeerSyncReplicationStateProcedure } return Flow.HAS_MORE_STATE; case REPLAY_REMOTE_WAL_IN_PEER: - // TODO: replay remote wal when transiting from S to DA. + addChildProcedure(new RecoverStandbyProcedure(peerId)); setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER); return Flow.HAS_MORE_STATE; case REOPEN_ALL_REGIONS_IN_PEER: http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java index 0495337..a98567a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java @@ -38,6 +38,8 @@ public class DualAsyncFSWAL extends AsyncFSWAL { private final Path remoteWalDir; + private volatile boolean skipRemoteWal = false; + public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir, String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, @@ -51,6 +53,9 @@ public class DualAsyncFSWAL extends AsyncFSWAL { @Override protected AsyncWriter createWriterInstance(Path path) throws IOException { AsyncWriter localWriter = super.createWriterInstance(path); + if (skipRemoteWal) { + return localWriter; + } AsyncWriter remoteWriter; boolean succ = false; try { @@ -64,4 +69,13 @@ public class DualAsyncFSWAL extends AsyncFSWAL { return CombinedAsyncWriter.create(CombinedAsyncWriter.Mode.SEQUENTIAL, remoteWriter, localWriter); } + + // Allow temporarily skipping the creation of remote writer. When failing to write to the remote + // dfs cluster, we need to reopen the regions and switch to use the original wal writer. But we + // need to write a close marker when closing a region, and if it fails, the whole rs will abort. + // So here we need to skip the creation of remote writer and make it possible to write the region + // close marker. + public void skipRemoteWal() { + this.skipRemoteWal = true; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index 1b17adc..3b18253 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -20,11 +20,13 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; +import java.util.ArrayList; import java.util.NavigableMap; - +import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -34,7 +36,9 @@ import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; + import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; @@ -179,4 +183,23 @@ public class WALUtil { return conf.getLong("hbase.regionserver.hlog.blocksize", CommonFSUtils.getDefaultBlockSize(fs, dir) * 2); } + + public static void filterCells(WALEdit edit, Function<Cell, Cell> mapper) { + ArrayList<Cell> cells = edit.getCells(); + int size = cells.size(); + int newSize = 0; + for (int i = 0; i < size; i++) { + Cell cell = mapper.apply(cells.get(i)); + if (cell != null) { + cells.set(newSize, cell); + newSize++; + } + } + for (int i = size - 1; i >= newSize; i--) { + cells.remove(i); + } + if (newSize < size / 2) { + cells.trimToSize(); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java index 6f2c764..2bb9811 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java @@ -21,11 +21,11 @@ package org.apache.hadoop.hbase.replication; import java.util.ArrayList; import java.util.Collections; import java.util.List; - -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; /** * A {@link WALEntryFilter} which contains multiple filters and applies them @@ -82,22 +82,16 @@ public class ChainWALEntryFilter implements WALEntryFilter { if (entry == null || cellFilters.length == 0) { return; } - ArrayList<Cell> cells = entry.getEdit().getCells(); - int size = cells.size(); - for (int i = size - 1; i >= 0; i--) { - Cell cell = cells.get(i); - for (WALCellFilter filter : cellFilters) { - cell = filter.filterCell(entry, cell); - if (cell != null) { - cells.set(i, cell); - } else { - cells.remove(i); - break; - } + WALUtil.filterCells(entry.getEdit(), c -> filterCell(entry, c)); + } + + private Cell filterCell(Entry entry, Cell cell) { + for (WALCellFilter filter : cellFilters) { + cell = filter.filterCell(entry, cell); + if (cell == null) { + break; } } - if (cells.size() < size / 2) { - cells.trimToSize(); - } + return cell; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java index 8dfe3a2..c9c5ef6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java @@ -21,21 +21,23 @@ import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -129,20 +131,31 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable { } } + // return whether we should include this entry. + private boolean filter(Entry entry) { + WALEdit edit = entry.getEdit(); + WALUtil.filterCells(edit, c -> CellUtil.matchingFamily(c, WALEdit.METAFAMILY) ? null : c); + return !edit.isEmpty(); + } + private List<Entry> readWALEntries(Reader reader) throws IOException { List<Entry> entries = new ArrayList<>(); if (reader == null) { return entries; } long size = 0; - Entry entry = reader.next(); - while (entry != null) { - entries.add(entry); - size += entry.getEdit().heapSize(); - if (size > batchSize) { + for (;;) { + Entry entry = reader.next(); + if (entry == null) { break; } - entry = reader.next(); + if (filter(entry)) { + entries.add(entry); + size += entry.getEdit().heapSize(); + if (size > batchSize) { + break; + } + } } return entries; } http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java index e4afc33..cb33dab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java @@ -54,8 +54,10 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv } Pair<SyncReplicationState, SyncReplicationState> states = peer.getSyncReplicationStateAndNewState(); - if (states.getFirst() == SyncReplicationState.ACTIVE && - states.getSecond() == SyncReplicationState.NONE) { + if ((states.getFirst() == SyncReplicationState.ACTIVE && + states.getSecond() == SyncReplicationState.NONE) || + (states.getFirst() == SyncReplicationState.DOWNGRADE_ACTIVE && + states.getSecond() == SyncReplicationState.ACTIVE)) { return Optional.of(Pair.newPair(peerId, peer.getPeerConfig().getRemoteWALDir())); } else { return Optional.empty(); http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 3eb8f8f..5a3fba3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -136,8 +136,16 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen walCopy = wal; if (walCopy == null) { walCopy = createWAL(); + boolean succ = false; + try { + walCopy.init(); + succ = true; + } finally { + if (!succ) { + walCopy.close(); + } + } wal = walCopy; - walCopy.init(); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index 54287fe..9cbb095 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -69,7 +69,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen private final WALProvider provider; private SyncReplicationPeerInfoProvider peerInfoProvider = - new DefaultSyncReplicationPeerInfoProvider(); + new DefaultSyncReplicationPeerInfoProvider(); private WALFactory factory; @@ -83,7 +83,11 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen private AtomicBoolean initialized = new AtomicBoolean(false); - private final ConcurrentMap<String, DualAsyncFSWAL> peerId2WAL = new ConcurrentHashMap<>(); + // when switching from A to DA, we will put a Optional.empty into this map if there is no WAL for + // the peer yet. When getting WAL from this map the caller should know that it should not use + // DualAsyncFSWAL any more. + private final ConcurrentMap<String, Optional<DualAsyncFSWAL>> peerId2WAL = + new ConcurrentHashMap<>(); private final KeyLocker<String> createLock = new KeyLocker<>(); @@ -123,18 +127,27 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen } private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException { - DualAsyncFSWAL wal = peerId2WAL.get(peerId); - if (wal != null) { - return wal; + Optional<DualAsyncFSWAL> opt = peerId2WAL.get(peerId); + if (opt != null) { + return opt.orElse(null); } Lock lock = createLock.acquireLock(peerId); try { - wal = peerId2WAL.get(peerId); - if (wal == null) { - wal = createWAL(peerId, remoteWALDir); - peerId2WAL.put(peerId, wal); + opt = peerId2WAL.get(peerId); + if (opt != null) { + return opt.orElse(null); + } + DualAsyncFSWAL wal = createWAL(peerId, remoteWALDir); + boolean succ = false; + try { wal.init(); + succ = true; + } finally { + if (!succ) { + wal.close(); + } } + peerId2WAL.put(peerId, Optional.of(wal)); return wal; } finally { lock.unlock(); @@ -146,18 +159,20 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen if (region == null) { return provider.getWAL(null); } + WAL wal = null; Optional<Pair<String, String>> peerIdAndRemoteWALDir = peerInfoProvider.getPeerIdAndRemoteWALDir(region); if (peerIdAndRemoteWALDir.isPresent()) { Pair<String, String> pair = peerIdAndRemoteWALDir.get(); - return getWAL(pair.getFirst(), pair.getSecond()); - } else { - return provider.getWAL(region); + wal = getWAL(pair.getFirst(), pair.getSecond()); } + return wal != null ? wal : provider.getWAL(region); } private Stream<WAL> getWALStream() { - return Streams.concat(peerId2WAL.values().stream(), provider.getWALs().stream()); + return Streams.concat( + peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get), + provider.getWALs().stream()); } @Override @@ -169,12 +184,14 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen public void shutdown() throws IOException { // save the last exception and rethrow IOException failure = null; - for (DualAsyncFSWAL wal : peerId2WAL.values()) { - try { - wal.shutdown(); - } catch (IOException e) { - LOG.error("Shutdown WAL failed", e); - failure = e; + for (Optional<DualAsyncFSWAL> wal : peerId2WAL.values()) { + if (wal.isPresent()) { + try { + wal.get().shutdown(); + } catch (IOException e) { + LOG.error("Shutdown WAL failed", e); + failure = e; + } } } provider.shutdown(); @@ -187,12 +204,14 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen public void close() throws IOException { // save the last exception and rethrow IOException failure = null; - for (DualAsyncFSWAL wal : peerId2WAL.values()) { - try { - wal.close(); - } catch (IOException e) { - LOG.error("Close WAL failed", e); - failure = e; + for (Optional<DualAsyncFSWAL> wal : peerId2WAL.values()) { + if (wal.isPresent()) { + try { + wal.get().close(); + } catch (IOException e) { + LOG.error("Close WAL failed", e); + failure = e; + } } } provider.close(); @@ -208,8 +227,8 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen @Override public long getLogFileSize() { - return peerId2WAL.values().stream().mapToLong(DualAsyncFSWAL::getLogFileSize).sum() + - provider.getLogFileSize(); + return peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get) + .mapToLong(DualAsyncFSWAL::getLogFileSize).sum() + provider.getLogFileSize(); } private void safeClose(WAL wal) { @@ -231,10 +250,23 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen @Override public void peerSyncReplicationStateChange(String peerId, SyncReplicationState from, SyncReplicationState to, int stage) { - // TODO: stage 0 - if (from == SyncReplicationState.ACTIVE && to == SyncReplicationState.DOWNGRADE_ACTIVE && - stage == 1) { - safeClose(peerId2WAL.remove(peerId)); + if (from == SyncReplicationState.ACTIVE && to == SyncReplicationState.DOWNGRADE_ACTIVE) { + if (stage == 0) { + Lock lock = createLock.acquireLock(peerId); + try { + Optional<DualAsyncFSWAL> opt = peerId2WAL.get(peerId); + if (opt != null) { + opt.ifPresent(DualAsyncFSWAL::skipRemoteWal); + } else { + // add a place holder to tell the getWAL caller do not use DualAsyncFSWAL any more. + peerId2WAL.put(peerId, Optional.empty()); + } + } finally { + lock.unlock(); + } + } else if (stage == 1) { + peerId2WAL.remove(peerId).ifPresent(this::safeClose); + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java index 1d4dc1b..cd0e52e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java @@ -20,12 +20,11 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; import java.util.ArrayList; - import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.HeapSize; @@ -33,9 +32,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; @@ -54,7 +53,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.REPLICATION, HBaseInterfaceAudience.COPROC }) public class WALEdit implements HeapSize { - private static final Logger LOG = LoggerFactory.getLogger(WALEdit.class); // TODO: Get rid of this; see HBASE-8457 public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY"); http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index afe043f..4f6a898 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -135,7 +135,7 @@ public class WALFactory { static WALProvider createProvider(Class<? extends WALProvider> clazz) throws IOException { LOG.info("Instantiating WALProvider of type {}", clazz); try { - return clazz.newInstance(); + return clazz.getDeclaredConstructor().newInstance(); } catch (Exception e) { LOG.error("couldn't set up WALProvider, the configured class is " + clazz); LOG.debug("Exception details for failure to load WALProvider.", e); http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 486ab51..ac98283 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -35,6 +35,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -259,9 +260,11 @@ public class TestReplicationAdmin { TEST_UTIL.createTable(tableName, Bytes.toBytes("family")); ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); - String rootDir = "hdfs://srv1:9999/hbase"; + Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL"); + TEST_UTIL.getTestFileSystem().mkdirs(new Path(rootDir, ID_ONE)); builder.setClusterKey(KEY_ONE); - builder.setRemoteWALDir(rootDir); + builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(), + TEST_UTIL.getTestFileSystem().getWorkingDirectory()).toString()); builder.setReplicateAllUserTables(false); Map<TableName, List<String>> tableCfs = new HashMap<>(); tableCfs.put(tableName, new ArrayList<>()); @@ -1081,10 +1084,12 @@ public class TestReplicationAdmin { // OK } - String rootDir = "hdfs://srv1:9999/hbase"; + Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL"); + TEST_UTIL.getTestFileSystem().mkdirs(new Path(rootDir, ID_SECOND)); builder = ReplicationPeerConfig.newBuilder(); builder.setClusterKey(KEY_SECOND); - builder.setRemoteWALDir(rootDir); + builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(), + TEST_UTIL.getTestFileSystem().getWorkingDirectory()).toString()); builder.setReplicateAllUserTables(false); Map<TableName, List<String>> tableCfs = new HashMap<>(); tableCfs.put(tableName, new ArrayList<>()); @@ -1105,13 +1110,18 @@ public class TestReplicationAdmin { assertEquals(SyncReplicationState.ACTIVE, hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); - try { - hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, - SyncReplicationState.STANDBY); - fail("Can't transit cluster state from ACTIVE to STANDBY"); - } catch (Exception e) { - // OK - } + hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.STANDBY); + assertEquals(SyncReplicationState.STANDBY, + hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); + + hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, + SyncReplicationState.DOWNGRADE_ACTIVE); + assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE, + hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); + + hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE); + assertEquals(SyncReplicationState.ACTIVE, + hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.DOWNGRADE_ACTIVE); @@ -1121,7 +1131,6 @@ public class TestReplicationAdmin { hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.STANDBY); assertEquals(SyncReplicationState.STANDBY, hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); - try { hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE); fail("Can't transit cluster state from STANDBY to ACTIVE"); http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java index 17f24e8..c446306 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALDurability.java @@ -104,6 +104,7 @@ public class TestWALDurability { FileSystem fs = FileSystem.get(conf); Path rootDir = new Path(dir + getName()); CustomFSLog customFSLog = new CustomFSLog(fs, rootDir, getName(), conf); + customFSLog.init(); HRegion region = initHRegion(tableName, null, null, customFSLog); byte[] bytes = Bytes.toBytes(getName()); Put put = new Put(bytes); @@ -118,6 +119,7 @@ public class TestWALDurability { conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true"); fs = FileSystem.get(conf); customFSLog = new CustomFSLog(fs, rootDir, getName(), conf); + customFSLog.init(); region = initHRegion(tableName, null, null, customFSLog); customFSLog.resetSyncFlag(); http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java new file mode 100644 index 0000000..30dbdb5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +/** + * Base class for testing sync replication. + */ +public class SyncReplicationTestBase { + + protected static final HBaseZKTestingUtility ZK_UTIL = new HBaseZKTestingUtility(); + + protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility(); + + protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility(); + + protected static TableName TABLE_NAME = TableName.valueOf("SyncRep"); + + protected static byte[] CF = Bytes.toBytes("cf"); + + protected static byte[] CQ = Bytes.toBytes("cq"); + + protected static String PEER_ID = "1"; + + private static void initTestingUtility(HBaseTestingUtility util, String zkParent) { + util.setZkCluster(ZK_UTIL.getZkCluster()); + Configuration conf = util.getConfiguration(); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent); + conf.setInt("replication.source.size.capacity", 102400); + conf.setLong("replication.source.sleepforretries", 100); + conf.setInt("hbase.regionserver.maxlogs", 10); + conf.setLong("hbase.master.logcleaner.ttl", 10); + conf.setInt("zookeeper.recovery.retry", 1); + conf.setInt("zookeeper.recovery.retry.intervalmill", 10); + conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf.setInt("replication.stats.thread.period.seconds", 5); + conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); + conf.setLong("replication.sleep.before.failover", 2000); + conf.setInt("replication.source.maxretriesmultiplier", 10); + conf.setFloat("replication.source.ratio", 1.0f); + conf.setBoolean("replication.source.eof.autorecovery", true); + } + + @BeforeClass + public static void setUp() throws Exception { + ZK_UTIL.startMiniZKCluster(); + initTestingUtility(UTIL1, "/cluster1"); + initTestingUtility(UTIL2, "/cluster2"); + UTIL1.startMiniCluster(3); + UTIL2.startMiniCluster(3); + TableDescriptor td = + TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); + UTIL1.getAdmin().createTable(td); + UTIL2.getAdmin().createTable(td); + FileSystem fs1 = UTIL1.getTestFileSystem(); + FileSystem fs2 = UTIL2.getTestFileSystem(); + Path remoteWALDir1 = + new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(), + "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory()); + Path remoteWALDir2 = + new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(), + "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory()); + UTIL1.getAdmin().addReplicationPeer(PEER_ID, + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()) + .setReplicateAllUserTables(false) + .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) + .setRemoteWALDir(remoteWALDir2.toUri().toString()).build()); + UTIL2.getAdmin().addReplicationPeer(PEER_ID, + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey()) + .setReplicateAllUserTables(false) + .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) + .setRemoteWALDir(remoteWALDir1.toUri().toString()).build()); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL1.shutdownMiniCluster(); + UTIL2.shutdownMiniCluster(); + ZK_UTIL.shutdownMiniZKCluster(); + } + + protected final void write(HBaseTestingUtility util, int start, int end) throws IOException { + try (Table table = util.getConnection().getTable(TABLE_NAME)) { + for (int i = start; i < end; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + } + + protected final void verify(HBaseTestingUtility util, int start, int end) throws IOException { + try (Table table = util.getConnection().getTable(TABLE_NAME)) { + for (int i = start; i < end; i++) { + assertEquals(i, Bytes.toInt(table.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ))); + } + } + } + + protected final void verifyThroughRegion(HBaseTestingUtility util, int start, int end) + throws IOException { + HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); + for (int i = start; i < end; i++) { + assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ))); + } + } + + protected final void verifyNotReplicatedThroughRegion(HBaseTestingUtility util, int start, + int end) throws IOException { + HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); + for (int i = start; i < end; i++) { + assertTrue(region.get(new Get(Bytes.toBytes(i))).isEmpty()); + } + } + + protected final void waitUntilReplicationDone(HBaseTestingUtility util, int end) + throws Exception { + // The reject check is in RSRpcService so we can still read through HRegion + HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); + util.waitFor(30000, new ExplainingPredicate<Exception>() { + + @Override + public boolean evaluate() throws Exception { + return !region.get(new Get(Bytes.toBytes(end - 1))).isEmpty(); + } + + @Override + public String explainFailure() throws Exception { + return "Replication has not been catched up yet"; + } + }); + } + + protected final void writeAndVerifyReplication(HBaseTestingUtility util1, + HBaseTestingUtility util2, int start, int end) throws Exception { + write(util1, start, end); + waitUntilReplicationDone(util2, end); + verifyThroughRegion(util2, start, end); + } + + protected final Path getRemoteWALDir(MasterFileSystem mfs, String peerId) { + Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); + return new Path(remoteWALDir, PEER_ID); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java deleted file mode 100644 index 288dcbf..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplication.java +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import static org.hamcrest.CoreMatchers.containsString; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HBaseZKTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; -import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RetriesExhaustedException; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.client.TableDescriptorBuilder; -import org.apache.hadoop.hbase.master.MasterFileSystem; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; - -@Category({ ReplicationTests.class, LargeTests.class }) -public class TestSyncReplication { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestSyncReplication.class); - - private static final HBaseZKTestingUtility ZK_UTIL = new HBaseZKTestingUtility(); - - private static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility(); - - private static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility(); - - private static TableName TABLE_NAME = TableName.valueOf("SyncRep"); - - private static byte[] CF = Bytes.toBytes("cf"); - - private static byte[] CQ = Bytes.toBytes("cq"); - - private static String PEER_ID = "1"; - - private static void initTestingUtility(HBaseTestingUtility util, String zkParent) { - util.setZkCluster(ZK_UTIL.getZkCluster()); - Configuration conf = util.getConfiguration(); - conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent); - conf.setInt("replication.source.size.capacity", 102400); - conf.setLong("replication.source.sleepforretries", 100); - conf.setInt("hbase.regionserver.maxlogs", 10); - conf.setLong("hbase.master.logcleaner.ttl", 10); - conf.setInt("zookeeper.recovery.retry", 1); - conf.setInt("zookeeper.recovery.retry.intervalmill", 10); - conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); - conf.setInt("replication.stats.thread.period.seconds", 5); - conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); - conf.setLong("replication.sleep.before.failover", 2000); - conf.setInt("replication.source.maxretriesmultiplier", 10); - conf.setFloat("replication.source.ratio", 1.0f); - conf.setBoolean("replication.source.eof.autorecovery", true); - } - - @BeforeClass - public static void setUp() throws Exception { - ZK_UTIL.startMiniZKCluster(); - initTestingUtility(UTIL1, "/cluster1"); - initTestingUtility(UTIL2, "/cluster2"); - UTIL1.startMiniCluster(3); - UTIL2.startMiniCluster(3); - TableDescriptor td = - TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder - .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); - UTIL1.getAdmin().createTable(td); - UTIL2.getAdmin().createTable(td); - FileSystem fs1 = UTIL1.getTestFileSystem(); - FileSystem fs2 = UTIL2.getTestFileSystem(); - Path remoteWALDir1 = - new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(), - "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory()); - Path remoteWALDir2 = - new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getRootDir(), - "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory()); - UTIL1.getAdmin().addReplicationPeer(PEER_ID, - ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()) - .setReplicateAllUserTables(false) - .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) - .setRemoteWALDir(remoteWALDir2.toUri().toString()).build()); - UTIL2.getAdmin().addReplicationPeer(PEER_ID, - ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey()) - .setReplicateAllUserTables(false) - .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) - .setRemoteWALDir(remoteWALDir1.toUri().toString()).build()); - } - - @AfterClass - public static void tearDown() throws Exception { - UTIL1.shutdownMiniCluster(); - UTIL2.shutdownMiniCluster(); - ZK_UTIL.shutdownMiniZKCluster(); - } - - @FunctionalInterface - private interface TableAction { - - void call(Table table) throws IOException; - } - - private void assertDisallow(Table table, TableAction action) throws IOException { - try { - action.call(table); - } catch (DoNotRetryIOException | RetriesExhaustedException e) { - // expected - assertThat(e.getMessage(), containsString("STANDBY")); - } - } - - @Test - public void testStandby() throws Exception { - MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem(); - Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); - Path remoteWALDirForPeer = new Path(remoteWALDir, PEER_ID); - assertFalse(mfs.getWALFileSystem().exists(remoteWALDirForPeer)); - UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, - SyncReplicationState.STANDBY); - assertTrue(mfs.getWALFileSystem().exists(remoteWALDirForPeer)); - try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) { - assertDisallow(table, t -> t.get(new Get(Bytes.toBytes("row")))); - assertDisallow(table, - t -> t.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))); - assertDisallow(table, t -> t.delete(new Delete(Bytes.toBytes("row")))); - assertDisallow(table, t -> t.incrementColumnValue(Bytes.toBytes("row"), CF, CQ, 1)); - assertDisallow(table, - t -> t.append(new Append(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))); - assertDisallow(table, - t -> t.get(Arrays.asList(new Get(Bytes.toBytes("row")), new Get(Bytes.toBytes("row1"))))); - assertDisallow(table, - t -> t - .put(Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")), - new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, Bytes.toBytes("row1"))))); - assertDisallow(table, t -> t.mutateRow(new RowMutations(Bytes.toBytes("row")) - .add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))))); - } - // But we should still allow replication writes - try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) { - for (int i = 0; i < 100; i++) { - table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); - } - } - // The reject check is in RSRpcService so we can still read through HRegion - HRegion region = UTIL2.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0); - UTIL2.waitFor(30000, new ExplainingPredicate<Exception>() { - - @Override - public boolean evaluate() throws Exception { - return !region.get(new Get(Bytes.toBytes(99))).isEmpty(); - } - - @Override - public String explainFailure() throws Exception { - return "Replication has not been catched up yet"; - } - }); - for (int i = 0; i < 100; i++) { - assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ))); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java new file mode 100644 index 0000000..f4fb5fe --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSyncReplicationActive extends SyncReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationActive.class); + + @Test + public void testActive() throws Exception { + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + UTIL1.getAdmin().disableReplicationPeer(PEER_ID); + write(UTIL1, 0, 100); + Thread.sleep(2000); + // peer is disabled so no data have been replicated + verifyNotReplicatedThroughRegion(UTIL2, 0, 100); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + // confirm that the data is there after we convert the peer to DA + verify(UTIL2, 0, 100); + + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + + writeAndVerifyReplication(UTIL2, UTIL1, 100, 200); + + // shutdown the cluster completely + UTIL1.shutdownMiniCluster(); + // confirm that we can convert to DA even if the remote slave cluster is down + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + write(UTIL2, 200, 300); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/f7a7578b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java new file mode 100644 index 0000000..ed61d2a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationStandBy.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSyncReplicationStandBy extends SyncReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationStandBy.class); + + @FunctionalInterface + private interface TableAction { + + void call(Table table) throws IOException; + } + + private void assertDisallow(Table table, TableAction action) throws IOException { + try { + action.call(table); + } catch (DoNotRetryIOException | RetriesExhaustedException e) { + // expected + assertThat(e.getMessage(), containsString("STANDBY")); + } + } + + @Test + public void testStandby() throws Exception { + MasterFileSystem mfs = UTIL2.getHBaseCluster().getMaster().getMasterFileSystem(); + Path remoteWALDir = getRemoteWALDir(mfs, PEER_ID); + assertFalse(mfs.getWALFileSystem().exists(remoteWALDir)); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + assertTrue(mfs.getWALFileSystem().exists(remoteWALDir)); + try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) { + assertDisallow(table, t -> t.get(new Get(Bytes.toBytes("row")))); + assertDisallow(table, + t -> t.put(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))); + assertDisallow(table, t -> t.delete(new Delete(Bytes.toBytes("row")))); + assertDisallow(table, t -> t.incrementColumnValue(Bytes.toBytes("row"), CF, CQ, 1)); + assertDisallow(table, + t -> t.append(new Append(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")))); + assertDisallow(table, + t -> t.get(Arrays.asList(new Get(Bytes.toBytes("row")), new Get(Bytes.toBytes("row1"))))); + assertDisallow(table, + t -> t + .put(Arrays.asList(new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row")), + new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, Bytes.toBytes("row1"))))); + assertDisallow(table, t -> t.mutateRow(new RowMutations(Bytes.toBytes("row")) + .add((Mutation) new Put(Bytes.toBytes("row")).addColumn(CF, CQ, Bytes.toBytes("row"))))); + } + // We should still allow replication writes + writeAndVerifyReplication(UTIL1, UTIL2, 0, 100); + } +}