http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f8234b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java new file mode 100644 index 0000000..92f2c52 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProvider.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.Optional; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Get the information for a sync replication peer. + */ +@InterfaceAudience.Private +public interface SyncReplicationPeerInfoProvider { + + /** + * Return the peer id and remote WAL directory if the region is synchronously replicated and the + * state is {@link SyncReplicationState#ACTIVE}. + */ + Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info); + + /** + * Check whether the give region is contained in a sync replication peer which is in the given + * state. + */ + boolean isInState(RegionInfo info, SyncReplicationState state); +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f8234b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java new file mode 100644 index 0000000..32159e6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.Optional; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.replication.ReplicationPeer; +import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProvider { + + private final ReplicationPeers replicationPeers; + + private final SyncReplicationPeerMappingManager mapping; + + SyncReplicationPeerInfoProviderImpl(ReplicationPeers replicationPeers, + SyncReplicationPeerMappingManager mapping) { + this.replicationPeers = replicationPeers; + this.mapping = mapping; + } + + @Override + public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) { + String peerId = mapping.getPeerId(info); + if (peerId == null) { + return Optional.empty(); + } + ReplicationPeer peer = replicationPeers.getPeer(peerId); + if (peer == null) { + return Optional.empty(); + } + if (peer.getSyncReplicationState() == SyncReplicationState.ACTIVE) { + return Optional.of(Pair.newPair(peerId, peer.getPeerConfig().getRemoteWALDir())); + } else { + return Optional.empty(); + } + } + + @Override + public boolean isInState(RegionInfo info, SyncReplicationState state) { + String peerId = mapping.getPeerId(info); + if (peerId == null) { + return false; + } + ReplicationPeer peer = replicationPeers.getPeer(peerId); + if (peer == null) { + return false; + } + return peer.getSyncReplicationState() == state; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f8234b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java new file mode 100644 index 0000000..64216cb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerMappingManager.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Used to map region to sync replication peer id. + * <p> + * TODO: now only support include table options. + */ +@InterfaceAudience.Private +class SyncReplicationPeerMappingManager { + + private final ConcurrentMap<TableName, String> table2PeerId = new ConcurrentHashMap<>(); + + void add(String peerId, ReplicationPeerConfig peerConfig) { + peerConfig.getTableCFsMap().keySet().forEach(tn -> table2PeerId.put(tn, peerId)); + } + + void remove(String peerId, ReplicationPeerConfig peerConfig) { + peerConfig.getTableCFsMap().keySet().forEach(table2PeerId::remove); + } + + String getPeerId(RegionInfo info) { + return table2PeerId.get(info.getTable()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f8234b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java deleted file mode 100644 index b97bf7e..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; - -import java.util.Optional; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Get the peer id and remote root dir if the region is synchronously replicated. - */ -@InterfaceAudience.Private -public interface SyncReplicationPeerProvider { - - /** - * Return the peer id and remote WAL directory if the region is synchronously replicated. - */ - Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f8234b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index bccc842..e3de6b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener; -import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerProvider; +import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.hadoop.hbase.util.Pair; @@ -67,7 +67,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen private final WALProvider provider; - private final SyncReplicationPeerProvider peerProvider; + private SyncReplicationPeerInfoProvider peerInfoProvider; private WALFactory factory; @@ -85,9 +85,12 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen private final KeyLocker<String> createLock = new KeyLocker<>(); - SyncReplicationWALProvider(WALProvider provider, SyncReplicationPeerProvider peerProvider) { + SyncReplicationWALProvider(WALProvider provider) { this.provider = provider; - this.peerProvider = peerProvider; + } + + public void setPeerInfoProvider(SyncReplicationPeerInfoProvider peerInfoProvider) { + this.peerInfoProvider = peerInfoProvider; } @Override @@ -99,7 +102,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen this.conf = conf; this.factory = factory; Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass = - NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); + NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); channelClass = eventLoopGroupAndChannelClass.getSecond(); } @@ -112,9 +115,9 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen Path remoteWALDirPath = new Path(remoteWALDir); FileSystem remoteFs = remoteWALDirPath.getFileSystem(conf); return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), remoteFs, - CommonFSUtils.getWALRootDir(conf), new Path(remoteWALDirPath, peerId), - getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), - conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass); + CommonFSUtils.getWALRootDir(conf), new Path(remoteWALDirPath, peerId), + getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), + conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass); } private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException { @@ -139,7 +142,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen @Override public WAL getWAL(RegionInfo region) throws IOException { Optional<Pair<String, String>> peerIdAndRemoteWALDir = - peerProvider.getPeerIdAndRemoteWALDir(region); + peerInfoProvider.getPeerIdAndRemoteWALDir(region); if (peerIdAndRemoteWALDir.isPresent()) { Pair<String, String> pair = peerIdAndRemoteWALDir.get(); return getWAL(pair.getFirst(), pair.getSecond()); @@ -221,14 +224,12 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen } @Override - public void peerRemoved(String peerId) { - safeClose(peerId2WAL.remove(peerId)); - } - - @Override public void peerSyncReplicationStateChange(String peerId, SyncReplicationState from, - SyncReplicationState to) { - assert to == SyncReplicationState.DOWNGRADE_ACTIVE; - safeClose(peerId2WAL.remove(peerId)); + SyncReplicationState to, int stage) { + // TODO: stage 0 + if (from == SyncReplicationState.ACTIVE && to == SyncReplicationState.DOWNGRADE_ACTIVE && + stage == 1) { + safeClose(peerId2WAL.remove(peerId)); + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f8234b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 06999ea..202b584 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -24,10 +24,10 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; -import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerProvider; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; @@ -143,18 +143,6 @@ public class WALFactory { } /** - * instantiate a provider from a config property. requires conf to have already been set (as well - * as anything the provider might need to read). - */ - private WALProvider getProvider(String key, String defaultValue, String providerId) - throws IOException { - WALProvider provider = createProvider(getProviderClass(key, defaultValue)); - provider.init(this, conf, providerId); - provider.addWALActionsListener(new MetricsWAL()); - return provider; - } - - /** * @param conf must not be null, will keep a reference to read params in later reader/writer * instances. * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations @@ -171,7 +159,13 @@ public class WALFactory { this.factoryId = factoryId; // end required early initialization if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) { - provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null); + WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); + if (conf.getBoolean(HConstants.SYNC_REPLICATION_ENABLED, false)) { + provider = new SyncReplicationWALProvider(provider); + } + provider.init(this, conf, null); + provider.addWALActionsListener(new MetricsWAL()); + this.provider = provider; } else { // special handling of existing configuration behavior. LOG.warn("Running with WAL disabled."); @@ -181,26 +175,6 @@ public class WALFactory { } /** - * A temporary constructor for testing synchronous replication. - * <p> - * Remove it once we can integrate the synchronous replication logic in RS. - */ - @VisibleForTesting - WALFactory(Configuration conf, String factoryId, SyncReplicationPeerProvider peerProvider) - throws IOException { - timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); - /* TODO Both of these are probably specific to the fs wal provider */ - logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, - AbstractFSWALProvider.Reader.class); - this.conf = conf; - this.factoryId = factoryId; - WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); - this.provider = new SyncReplicationWALProvider(provider, peerProvider); - this.provider.init(this, conf, null); - this.provider.addWALActionsListener(new MetricsWAL()); - } - - /** * Shutdown all WALs and clean up any underlying storage. * Use only when you will not need to replay and edits that have gone to any wals from this * factory. @@ -248,8 +222,9 @@ public class WALFactory { if (provider != null) { return provider; } - provider = getProvider(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER, - AbstractFSWALProvider.META_WAL_PROVIDER_ID); + provider = createProvider(getProviderClass(META_WAL_PROVIDER, DEFAULT_META_WAL_PROVIDER)); + provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID); + provider.addWALActionsListener(new MetricsWAL()); if (metaProvider.compareAndSet(null, provider)) { return provider; } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f8234b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index d462dbd..0ad476f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterR import org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -1008,7 +1009,7 @@ public class TestReplicationAdmin { @Test public void testTransitSyncReplicationPeerState() throws Exception { TableName tableName = TableName.valueOf(name.getMethodName()); - + TEST_UTIL.createTable(tableName, Bytes.toBytes("family")); ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); builder.setClusterKey(KEY_ONE); builder.setReplicateAllUserTables(false); http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f8234b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index b058da3..482f49a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -175,7 +175,10 @@ public abstract class TestReplicationSourceManager { ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/sync-rep-state"); ZKUtil.setData(zkw, "/hbase/replication/peers/1/sync-rep-state", - SyncReplicationState.toByteArray(SyncReplicationState.NONE)); + ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES); + ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/new-sync-rep-state"); + ZKUtil.setData(zkw, "/hbase/replication/peers/1/new-sync-rep-state", + ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES); ZKUtil.createWithParents(zkw, "/hbase/replication/state"); ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f8234b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java index f09e51e..986228c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java @@ -27,6 +27,7 @@ import java.util.Optional; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.RegionInfo; @@ -35,6 +36,8 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; @@ -51,7 +54,7 @@ public class TestSyncReplicationWALProvider { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestSyncReplicationWALProvider.class); + HBaseClassTestRule.forClass(TestSyncReplicationWALProvider.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); @@ -69,19 +72,30 @@ public class TestSyncReplicationWALProvider { private static WALFactory FACTORY; - private static Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) { - if (info.getTable().equals(TABLE)) { - return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR)); - } else { - return Optional.empty(); + public static final class InfoProvider implements SyncReplicationPeerInfoProvider { + + @Override + public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) { + if (info.getTable().equals(TABLE)) { + return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR)); + } else { + return Optional.empty(); + } + } + + @Override + public boolean isInState(RegionInfo info, SyncReplicationState state) { + // TODO Implement SyncReplicationPeerInfoProvider.isInState + return false; } } @BeforeClass public static void setUpBeforeClass() throws Exception { + UTIL.getConfiguration().setBoolean(HConstants.SYNC_REPLICATION_ENABLED, true); UTIL.startMiniDFSCluster(3); - FACTORY = new WALFactory(UTIL.getConfiguration(), "test", - TestSyncReplicationWALProvider::getPeerIdAndRemoteWALDir); + FACTORY = new WALFactory(UTIL.getConfiguration(), "test"); + ((SyncReplicationWALProvider) FACTORY.getWALProvider()).setPeerInfoProvider(new InfoProvider()); UTIL.getTestFileSystem().mkdirs(new Path(REMOTE_WAL_DIR, PEER_ID)); } @@ -151,9 +165,9 @@ public class TestSyncReplicationWALProvider { DualAsyncFSWAL wal = (DualAsyncFSWAL) FACTORY.getWAL(REGION); assertEquals(2, FACTORY.getWALs().size()); testReadWrite(wal); - SyncReplicationWALProvider walProvider = - (SyncReplicationWALProvider) FACTORY.getWALProvider(); - walProvider.peerRemoved(PEER_ID); + SyncReplicationWALProvider walProvider = (SyncReplicationWALProvider) FACTORY.getWALProvider(); + walProvider.peerSyncReplicationStateChange(PEER_ID, SyncReplicationState.ACTIVE, + SyncReplicationState.DOWNGRADE_ACTIVE, 1); assertEquals(1, FACTORY.getWALs().size()); } }