HBASE-19857 Complete the procedure for adding a sync replication peer
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cfe74b9d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cfe74b9d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cfe74b9d Branch: refs/heads/HBASE-19064 Commit: cfe74b9d0331f972f82e06877045c2ff4695c2e3 Parents: 8a9e63a Author: zhangduo <zhang...@apache.org> Authored: Thu Jan 25 20:09:00 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Tue Jan 30 09:55:47 2018 +0800 ---------------------------------------------------------------------- .../hbase/replication/ReplicationPeer.java | 9 + .../hbase/replication/ReplicationPeerImpl.java | 28 +-- .../hbase/replication/ReplicationPeers.java | 3 +- .../regionserver/PeerActionListener.java | 10 +- .../SyncReplicationPeerProvider.java | 35 +++ .../SynchronousReplicationPeerProvider.java | 35 --- .../hbase/wal/SyncReplicationWALProvider.java | 229 +++++++++++++++++++ .../wal/SynchronousReplicationWALProvider.java | 220 ------------------ .../org/apache/hadoop/hbase/wal/WALFactory.java | 6 +- .../TestReplicationSourceManager.java | 3 + .../wal/TestSyncReplicationWALProvider.java | 153 +++++++++++++ .../TestSynchronousReplicationWALProvider.java | 153 ------------- 12 files changed, 450 insertions(+), 434 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/cfe74b9d/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index 2da3cce..0196a9a 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -54,6 +54,15 @@ public interface ReplicationPeer { PeerState getPeerState(); /** + * Returns the sync replication state of the peer by reading local cache. + * <p> + * If the peer is not a synchronous replication peer, a {@link SyncReplicationState#NONE} will be + * returned. + * @return the sync replication state + */ + SyncReplicationState getSyncReplicationState(); + + /** * Test whether the peer is enabled. * @return {@code true} if enabled, otherwise {@code false}. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/cfe74b9d/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java index 604e0bb..5ec14cd 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java @@ -36,6 +36,8 @@ public class ReplicationPeerImpl implements ReplicationPeer { private volatile PeerState peerState; + private volatile SyncReplicationState syncReplicationState; + private final List<ReplicationPeerConfigListener> peerConfigListeners; /** @@ -45,12 +47,13 @@ public class ReplicationPeerImpl implements ReplicationPeer { * @param id string representation of this peer's identifier * @param peerConfig configuration for the replication peer */ - public ReplicationPeerImpl(Configuration conf, String id, boolean peerState, - ReplicationPeerConfig peerConfig) { + public ReplicationPeerImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig, + boolean peerState, SyncReplicationState syncReplicationState) { this.conf = conf; this.id = id; this.peerState = peerState ? PeerState.ENABLED : PeerState.DISABLED; this.peerConfig = peerConfig; + this.syncReplicationState = syncReplicationState; this.peerConfigListeners = new ArrayList<>(); } @@ -77,37 +80,26 @@ public class ReplicationPeerImpl implements ReplicationPeer { return peerState; } - /** - * Get the peer config object - * @return the ReplicationPeerConfig for this peer - */ + @Override + public SyncReplicationState getSyncReplicationState() { + return syncReplicationState; + } + @Override public ReplicationPeerConfig getPeerConfig() { return peerConfig; } - /** - * Get the configuration object required to communicate with this peer - * @return configuration object - */ @Override public Configuration getConfiguration() { return conf; } - /** - * Get replicable (table, cf-list) map of this peer - * @return the replicable (table, cf-list) map - */ @Override public Map<TableName, List<String>> getTableCFs() { return this.peerConfig.getTableCFsMap(); } - /** - * Get replicable namespace set of this peer - * @return the replicable namespaces set - */ @Override public Set<String> getNamespaces() { return this.peerConfig.getNamespaces(); http://git-wip-us.apache.org/repos/asf/hbase/blob/cfe74b9d/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index eacb2f4..f120dbc 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -129,7 +129,8 @@ public class ReplicationPeers { private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException { ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); boolean enabled = peerStorage.isPeerEnabled(peerId); + SyncReplicationState syncReplicationState = peerStorage.getPeerSyncReplicationState(peerId); return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf), - peerId, enabled, peerConfig); + peerId, peerConfig, enabled, syncReplicationState); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/cfe74b9d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java index 74ad626..6df2af9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java @@ -17,17 +17,19 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.yetus.audience.InterfaceAudience; /** * Get notification for replication peer events. Mainly used for telling the - * {@link org.apache.hadoop.hbase.wal.SynchronousReplicationWALProvider} to close some WAL if not - * used any more. - * <p> - * TODO: Also need a synchronous peer state change notification. + * {@link org.apache.hadoop.hbase.wal.SyncReplicationWALProvider} to close some WAL if not used any + * more. */ @InterfaceAudience.Private public interface PeerActionListener { default void peerRemoved(String peerId) {} + + default void peerSyncReplicationStateChange(String peerId, SyncReplicationState from, + SyncReplicationState to) {} } http://git-wip-us.apache.org/repos/asf/hbase/blob/cfe74b9d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java new file mode 100644 index 0000000..b97bf7e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerProvider.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.Optional; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Get the peer id and remote root dir if the region is synchronously replicated. + */ +@InterfaceAudience.Private +public interface SyncReplicationPeerProvider { + + /** + * Return the peer id and remote WAL directory if the region is synchronously replicated. + */ + Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/cfe74b9d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SynchronousReplicationPeerProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SynchronousReplicationPeerProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SynchronousReplicationPeerProvider.java deleted file mode 100644 index b4e04fb..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SynchronousReplicationPeerProvider.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; - -import java.util.Optional; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Get the peer id and remote root dir if the region is synchronously replicated. - */ -@InterfaceAudience.Private -public interface SynchronousReplicationPeerProvider { - - /** - * Return the peer id and remote WAL directory if the region is synchronously replicated. - */ - Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info); -} http://git-wip-us.apache.org/repos/asf/hbase/blob/cfe74b9d/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java new file mode 100644 index 0000000..fea49cf --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDirectoryName; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener; +import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerProvider; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.KeyLocker; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.Streams; +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; + +/** + * The special {@link WALProvider} for synchronous replication. + * <p> + * It works like an interceptor, when getting WAL, first it will check if the given region should be + * replicated synchronously, if so it will return a special WAL for it, otherwise it will delegate + * the request to the normal {@link WALProvider}. + */ +@InterfaceAudience.Private +public class SyncReplicationWALProvider implements WALProvider, PeerActionListener { + + private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class); + + private static final String LOG_SUFFIX = ".syncrep"; + + private final WALProvider provider; + + private final SyncReplicationPeerProvider peerProvider; + + private WALFactory factory; + + private Configuration conf; + + private List<WALActionsListener> listeners; + + private EventLoopGroup eventLoopGroup; + + private Class<? extends Channel> channelClass; + + private AtomicBoolean initialized = new AtomicBoolean(false); + + private final ConcurrentMap<String, DualAsyncFSWAL> peerId2WAL = new ConcurrentHashMap<>(); + + private final KeyLocker<String> createLock = new KeyLocker<>(); + + SyncReplicationWALProvider(WALProvider provider, SyncReplicationPeerProvider peerProvider) { + this.provider = provider; + this.peerProvider = peerProvider; + } + + @Override + public void init(WALFactory factory, Configuration conf, List<WALActionsListener> listeners, + String providerId) throws IOException { + if (!initialized.compareAndSet(false, true)) { + throw new IllegalStateException("WALProvider.init should only be called once."); + } + provider.init(factory, conf, listeners, providerId); + this.conf = conf; + this.factory = factory; + this.listeners = listeners; + Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass = + NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); + eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); + channelClass = eventLoopGroupAndChannelClass.getSecond(); + } + + private String getLogPrefix(String peerId) { + return factory.factoryId + WAL_FILE_NAME_DELIMITER + peerId; + } + + private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException { + Path remoteWALDirPath = new Path(remoteWALDir); + FileSystem remoteFs = remoteWALDirPath.getFileSystem(conf); + return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), remoteFs, + CommonFSUtils.getWALRootDir(conf), new Path(remoteWALDirPath, peerId), + getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), + conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass); + } + + private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException { + DualAsyncFSWAL wal = peerId2WAL.get(peerId); + if (wal != null) { + return wal; + } + Lock lock = createLock.acquireLock(peerId); + try { + wal = peerId2WAL.get(peerId); + if (wal == null) { + wal = createWAL(peerId, remoteWALDir); + peerId2WAL.put(peerId, wal); + wal.init(); + } + return wal; + } finally { + lock.unlock(); + } + } + + @Override + public WAL getWAL(RegionInfo region) throws IOException { + Optional<Pair<String, String>> peerIdAndRemoteWALDir = + peerProvider.getPeerIdAndRemoteWALDir(region); + if (peerIdAndRemoteWALDir.isPresent()) { + Pair<String, String> pair = peerIdAndRemoteWALDir.get(); + return getWAL(pair.getFirst(), pair.getSecond()); + } else { + return provider.getWAL(region); + } + } + + private Stream<WAL> getWALStream() { + return Streams.concat(peerId2WAL.values().stream(), provider.getWALs().stream()); + } + + @Override + public List<WAL> getWALs() { + return getWALStream().collect(Collectors.toList()); + } + + @Override + public void shutdown() throws IOException { + // save the last exception and rethrow + IOException failure = null; + for (DualAsyncFSWAL wal : peerId2WAL.values()) { + try { + wal.shutdown(); + } catch (IOException e) { + LOG.error("Shutdown WAL failed", e); + failure = e; + } + } + provider.shutdown(); + if (failure != null) { + throw failure; + } + } + + @Override + public void close() throws IOException { + // save the last exception and rethrow + IOException failure = null; + for (DualAsyncFSWAL wal : peerId2WAL.values()) { + try { + wal.close(); + } catch (IOException e) { + LOG.error("Close WAL failed", e); + failure = e; + } + } + provider.close(); + if (failure != null) { + throw failure; + } + } + + @Override + public long getNumLogFiles() { + return peerId2WAL.size() + provider.getNumLogFiles(); + } + + @Override + public long getLogFileSize() { + return peerId2WAL.values().stream().mapToLong(DualAsyncFSWAL::getLogFileSize).sum() + + provider.getLogFileSize(); + } + + private void safeClose(WAL wal) { + if (wal != null) { + try { + wal.close(); + } catch (IOException e) { + LOG.error("Close WAL failed", e); + } + } + } + + @Override + public void peerRemoved(String peerId) { + safeClose(peerId2WAL.remove(peerId)); + } + + @Override + public void peerSyncReplicationStateChange(String peerId, SyncReplicationState from, + SyncReplicationState to) { + assert to == SyncReplicationState.DOWNGRADE_ACTIVE; + safeClose(peerId2WAL.remove(peerId)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/cfe74b9d/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SynchronousReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SynchronousReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SynchronousReplicationWALProvider.java deleted file mode 100644 index de1b56d..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SynchronousReplicationWALProvider.java +++ /dev/null @@ -1,220 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.wal; - -import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; -import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDirectoryName; -import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName; - -import java.io.IOException; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener; -import org.apache.hadoop.hbase.replication.regionserver.SynchronousReplicationPeerProvider; -import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.KeyLocker; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.collect.Streams; -import org.apache.hbase.thirdparty.io.netty.channel.Channel; -import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; - -/** - * The special {@link WALProvider} for synchronous replication. - * <p> - * It works like an interceptor, when getting WAL, first it will check if the given region should be - * replicated synchronously, if so it will return a special WAL for it, otherwise it will delegate - * the request to the normal {@link WALProvider}. - */ -@InterfaceAudience.Private -public class SynchronousReplicationWALProvider implements WALProvider, PeerActionListener { - - private static final Logger LOG = - LoggerFactory.getLogger(SynchronousReplicationWALProvider.class); - - private static final String LOG_SUFFIX = ".syncrep"; - - private final WALProvider provider; - - private final SynchronousReplicationPeerProvider peerProvider; - - private WALFactory factory; - - private Configuration conf; - - private List<WALActionsListener> listeners; - - private EventLoopGroup eventLoopGroup; - - private Class<? extends Channel> channelClass; - - private AtomicBoolean initialized = new AtomicBoolean(false); - - private final ConcurrentMap<String, DualAsyncFSWAL> peerId2WAL = new ConcurrentHashMap<>(); - - private final KeyLocker<String> createLock = new KeyLocker<>(); - - SynchronousReplicationWALProvider(WALProvider provider, - SynchronousReplicationPeerProvider peerProvider) { - this.provider = provider; - this.peerProvider = peerProvider; - } - - @Override - public void init(WALFactory factory, Configuration conf, List<WALActionsListener> listeners, - String providerId) throws IOException { - if (!initialized.compareAndSet(false, true)) { - throw new IllegalStateException("WALProvider.init should only be called once."); - } - provider.init(factory, conf, listeners, providerId); - this.conf = conf; - this.factory = factory; - this.listeners = listeners; - Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass = - NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); - eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); - channelClass = eventLoopGroupAndChannelClass.getSecond(); - } - - private String getLogPrefix(String peerId) { - return factory.factoryId + WAL_FILE_NAME_DELIMITER + peerId; - } - - private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException { - Path remoteWALDirPath = new Path(remoteWALDir); - FileSystem remoteFs = remoteWALDirPath.getFileSystem(conf); - return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), remoteFs, - CommonFSUtils.getWALRootDir(conf), new Path(remoteWALDirPath, peerId), - getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), - conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass); - } - - private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException { - DualAsyncFSWAL wal = peerId2WAL.get(peerId); - if (wal != null) { - return wal; - } - Lock lock = createLock.acquireLock(peerId); - try { - wal = peerId2WAL.get(peerId); - if (wal == null) { - wal = createWAL(peerId, remoteWALDir); - peerId2WAL.put(peerId, wal); - wal.init(); - } - return wal; - } finally { - lock.unlock(); - } - } - - @Override - public WAL getWAL(RegionInfo region) throws IOException { - Optional<Pair<String, String>> peerIdAndRemoteWALDir = - peerProvider.getPeerIdAndRemoteWALDir(region); - if (peerIdAndRemoteWALDir.isPresent()) { - Pair<String, String> pair = peerIdAndRemoteWALDir.get(); - return getWAL(pair.getFirst(), pair.getSecond()); - } else { - return provider.getWAL(region); - } - } - - private Stream<WAL> getWALStream() { - return Streams.concat(peerId2WAL.values().stream(), provider.getWALs().stream()); - } - - @Override - public List<WAL> getWALs() { - return getWALStream().collect(Collectors.toList()); - } - - @Override - public void shutdown() throws IOException { - // save the last exception and rethrow - IOException failure = null; - for (DualAsyncFSWAL wal : peerId2WAL.values()) { - try { - wal.shutdown(); - } catch (IOException e) { - LOG.error("Shutdown WAL failed", e); - failure = e; - } - } - provider.shutdown(); - if (failure != null) { - throw failure; - } - } - - @Override - public void close() throws IOException { - // save the last exception and rethrow - IOException failure = null; - for (DualAsyncFSWAL wal : peerId2WAL.values()) { - try { - wal.close(); - } catch (IOException e) { - LOG.error("Close WAL failed", e); - failure = e; - } - } - provider.close(); - if (failure != null) { - throw failure; - } - } - - @Override - public long getNumLogFiles() { - return peerId2WAL.size() + provider.getNumLogFiles(); - } - - @Override - public long getLogFileSize() { - return peerId2WAL.values().stream().mapToLong(DualAsyncFSWAL::getLogFileSize).sum() + - provider.getLogFileSize(); - } - - @Override - public void peerRemoved(String peerId) { - WAL wal = peerId2WAL.remove(peerId); - if (wal != null) { - try { - wal.close(); - } catch (IOException e) { - LOG.error("Close WAL failed", e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/cfe74b9d/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index fff1066..0e4d002 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.replication.regionserver.SynchronousReplicationPeerProvider; +import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerProvider; import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -193,7 +193,7 @@ public class WALFactory implements WALFileLengthProvider { */ @VisibleForTesting WALFactory(Configuration conf, List<WALActionsListener> listeners, String factoryId, - SynchronousReplicationPeerProvider peerProvider) throws IOException { + SyncReplicationPeerProvider peerProvider) throws IOException { timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); /* TODO Both of these are probably specific to the fs wal provider */ logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, @@ -201,7 +201,7 @@ public class WALFactory implements WALFileLengthProvider { this.conf = conf; this.factoryId = factoryId; WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); - this.provider = new SynchronousReplicationWALProvider(provider, peerProvider); + this.provider = new SyncReplicationWALProvider(provider, peerProvider); this.provider.init(this, conf, listeners, null); } http://git-wip-us.apache.org/repos/asf/hbase/blob/cfe74b9d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index e50d90f..38eefc1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -170,6 +170,9 @@ public abstract class TestReplicationSourceManager { ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); + ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/sync-rep-state"); + ZKUtil.setData(zkw, "/hbase/replication/peers/1/sync-rep-state", + Bytes.toBytes(SyncReplicationState.NONE.ordinal())); ZKUtil.createWithParents(zkw, "/hbase/replication/state"); ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); http://git-wip-us.apache.org/repos/asf/hbase/blob/cfe74b9d/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java new file mode 100644 index 0000000..59117e7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.util.Optional; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestSyncReplicationWALProvider { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static String PEER_ID = "1"; + + private static String REMOTE_WAL_DIR = "/RemoteWAL"; + + private static TableName TABLE = TableName.valueOf("table"); + + private static TableName TABLE_NO_REP = TableName.valueOf("table-no-rep"); + + private static RegionInfo REGION = RegionInfoBuilder.newBuilder(TABLE).build(); + + private static RegionInfo REGION_NO_REP = RegionInfoBuilder.newBuilder(TABLE_NO_REP).build(); + + private static WALFactory FACTORY; + + private static Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) { + if (info.getTable().equals(TABLE)) { + return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR)); + } else { + return Optional.empty(); + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.startMiniDFSCluster(3); + FACTORY = new WALFactory(UTIL.getConfiguration(), null, "test", + TestSyncReplicationWALProvider::getPeerIdAndRemoteWALDir); + UTIL.getTestFileSystem().mkdirs(new Path(REMOTE_WAL_DIR, PEER_ID)); + } + + @AfterClass + public static void tearDownAfterClass() throws IOException { + FACTORY.close(); + UTIL.shutdownMiniDFSCluster(); + } + + private void testReadWrite(DualAsyncFSWAL wal) throws Exception { + int recordCount = 100; + int columnCount = 10; + byte[] row = Bytes.toBytes("testRow"); + long timestamp = System.currentTimeMillis(); + MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + ProtobufLogTestHelper.doWrite(wal, REGION, TABLE, columnCount, recordCount, row, timestamp, + mvcc); + Path localFile = wal.getCurrentFileName(); + Path remoteFile = new Path(REMOTE_WAL_DIR + "/" + PEER_ID, localFile.getName()); + try (ProtobufLogReader reader = + (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) { + ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row, + timestamp); + } + try (ProtobufLogReader reader = + (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) { + ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row, + timestamp); + } + wal.rollWriter(); + DistributedFileSystem dfs = (DistributedFileSystem) UTIL.getDFSCluster().getFileSystem(); + UTIL.waitFor(5000, new ExplainingPredicate<Exception>() { + + @Override + public boolean evaluate() throws Exception { + return dfs.isFileClosed(localFile) && dfs.isFileClosed(remoteFile); + } + + @Override + public String explainFailure() throws Exception { + StringBuilder sb = new StringBuilder(); + if (!dfs.isFileClosed(localFile)) { + sb.append(localFile + " has not been closed yet."); + } + if (!dfs.isFileClosed(remoteFile)) { + sb.append(remoteFile + " has not been closed yet."); + } + return sb.toString(); + } + }); + try (ProtobufLogReader reader = + (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) { + ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row, + timestamp); + } + try (ProtobufLogReader reader = + (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) { + ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row, + timestamp); + } + } + + @Test + public void test() throws Exception { + WAL walNoRep = FACTORY.getWAL(REGION_NO_REP); + assertThat(walNoRep, not(instanceOf(DualAsyncFSWAL.class))); + DualAsyncFSWAL wal = (DualAsyncFSWAL) FACTORY.getWAL(REGION); + assertEquals(2, FACTORY.getWALs().size()); + testReadWrite(wal); + SyncReplicationWALProvider walProvider = + (SyncReplicationWALProvider) FACTORY.getWALProvider(); + walProvider.peerRemoved(PEER_ID); + assertEquals(1, FACTORY.getWALs().size()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/cfe74b9d/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSynchronousReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSynchronousReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSynchronousReplicationWALProvider.java deleted file mode 100644 index da686b1..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSynchronousReplicationWALProvider.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.wal; - -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.not; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -import java.io.IOException; -import java.util.Optional; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.client.RegionInfoBuilder; -import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; -import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; -import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ RegionServerTests.class, MediumTests.class }) -public class TestSynchronousReplicationWALProvider { - - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - - private static String PEER_ID = "1"; - - private static String REMOTE_WAL_DIR = "/RemoteWAL"; - - private static TableName TABLE = TableName.valueOf("table"); - - private static TableName TABLE_NO_REP = TableName.valueOf("table-no-rep"); - - private static RegionInfo REGION = RegionInfoBuilder.newBuilder(TABLE).build(); - - private static RegionInfo REGION_NO_REP = RegionInfoBuilder.newBuilder(TABLE_NO_REP).build(); - - private static WALFactory FACTORY; - - private static Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(RegionInfo info) { - if (info.getTable().equals(TABLE)) { - return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR)); - } else { - return Optional.empty(); - } - } - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - UTIL.startMiniDFSCluster(3); - FACTORY = new WALFactory(UTIL.getConfiguration(), null, "test", - TestSynchronousReplicationWALProvider::getPeerIdAndRemoteWALDir); - UTIL.getTestFileSystem().mkdirs(new Path(REMOTE_WAL_DIR, PEER_ID)); - } - - @AfterClass - public static void tearDownAfterClass() throws IOException { - FACTORY.close(); - UTIL.shutdownMiniDFSCluster(); - } - - private void testReadWrite(DualAsyncFSWAL wal) throws Exception { - int recordCount = 100; - int columnCount = 10; - byte[] row = Bytes.toBytes("testRow"); - long timestamp = System.currentTimeMillis(); - MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); - ProtobufLogTestHelper.doWrite(wal, REGION, TABLE, columnCount, recordCount, row, timestamp, - mvcc); - Path localFile = wal.getCurrentFileName(); - Path remoteFile = new Path(REMOTE_WAL_DIR + "/" + PEER_ID, localFile.getName()); - try (ProtobufLogReader reader = - (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) { - ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row, - timestamp); - } - try (ProtobufLogReader reader = - (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) { - ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row, - timestamp); - } - wal.rollWriter(); - DistributedFileSystem dfs = (DistributedFileSystem) UTIL.getDFSCluster().getFileSystem(); - UTIL.waitFor(5000, new ExplainingPredicate<Exception>() { - - @Override - public boolean evaluate() throws Exception { - return dfs.isFileClosed(localFile) && dfs.isFileClosed(remoteFile); - } - - @Override - public String explainFailure() throws Exception { - StringBuilder sb = new StringBuilder(); - if (!dfs.isFileClosed(localFile)) { - sb.append(localFile + " has not been closed yet."); - } - if (!dfs.isFileClosed(remoteFile)) { - sb.append(remoteFile + " has not been closed yet."); - } - return sb.toString(); - } - }); - try (ProtobufLogReader reader = - (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) { - ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row, - timestamp); - } - try (ProtobufLogReader reader = - (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) { - ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row, - timestamp); - } - } - - @Test - public void test() throws Exception { - WAL walNoRep = FACTORY.getWAL(REGION_NO_REP); - assertThat(walNoRep, not(instanceOf(DualAsyncFSWAL.class))); - DualAsyncFSWAL wal = (DualAsyncFSWAL) FACTORY.getWAL(REGION); - assertEquals(2, FACTORY.getWALs().size()); - testReadWrite(wal); - SynchronousReplicationWALProvider walProvider = - (SynchronousReplicationWALProvider) FACTORY.getWALProvider(); - walProvider.peerRemoved(PEER_ID); - assertEquals(1, FACTORY.getWALs().size()); - } -}