HBASE-20370 Also remove the wal file in remote cluster when we finish replicating a file
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c3b02fbe Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c3b02fbe Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c3b02fbe Branch: refs/heads/HBASE-19064 Commit: c3b02fbe386a240bac12236d99633fc0ca462c3d Parents: cd151f1 Author: zhangduo <zhang...@apache.org> Authored: Tue Apr 17 09:04:56 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Wed May 30 21:01:01 2018 +0800 ---------------------------------------------------------------------- .../hbase/replication/ReplicationUtils.java | 36 ++++++- .../regionserver/ReplicationSource.java | 38 +++---- .../ReplicationSourceInterface.java | 21 +++- .../regionserver/ReplicationSourceManager.java | 108 ++++++++++++++----- .../regionserver/ReplicationSourceShipper.java | 27 ++--- .../hbase/wal/SyncReplicationWALProvider.java | 11 +- .../replication/ReplicationSourceDummy.java | 20 ++-- .../TestReplicationSourceManager.java | 101 ++++++++++++----- 8 files changed, 246 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c3b02fbe/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index cb22f57..66e9b01 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -22,14 +22,17 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Helper class for replication. @@ -37,6 +40,8 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public final class ReplicationUtils { + private static final Logger LOG = LoggerFactory.getLogger(ReplicationUtils.class); + public static final String REPLICATION_ATTR_NAME = "__rep__"; public static final String REMOTE_WAL_DIR_NAME = "remoteWALs"; @@ -176,4 +181,33 @@ public final class ReplicationUtils { return tableCFs != null && tableCFs.containsKey(tableName); } } + + public static FileSystem getRemoteWALFileSystem(Configuration conf, String remoteWALDir) + throws IOException { + return new Path(remoteWALDir).getFileSystem(conf); + } + + public static Path getRemoteWALDirForPeer(String remoteWALDir, String peerId) { + return new Path(remoteWALDir, peerId); + } + + /** + * Do the sleeping logic + * @param msg Why we sleep + * @param sleepForRetries the base sleep time. + * @param sleepMultiplier by how many times the default sleeping time is augmented + * @param maxRetriesMultiplier the max retry multiplier + * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> + */ + public static boolean sleepForRetries(String msg, long sleepForRetries, int sleepMultiplier, + int maxRetriesMultiplier) { + try { + LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier); + Thread.sleep(sleepForRetries * sleepMultiplier); + } catch (InterruptedException e) { + LOG.debug("Interrupted while sleeping between retries"); + Thread.currentThread().interrupt(); + } + return sleepMultiplier < maxRetriesMultiplier; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c3b02fbe/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 4051efe..f25a232 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -89,8 +89,6 @@ public class ReplicationSource implements ReplicationSourceInterface { protected Configuration conf; protected ReplicationQueueInfo replicationQueueInfo; - // id of the peer cluster this source replicates to - private String peerId; // The manager of all sources to which we ping back our progress protected ReplicationSourceManager manager; @@ -168,8 +166,6 @@ public class ReplicationSource implements ReplicationSourceInterface { this.queueId = queueId; this.replicationQueueInfo = new ReplicationQueueInfo(queueId); - // ReplicationQueueInfo parses the peerId out of the znode for us - this.peerId = this.replicationQueueInfo.getPeerId(); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); @@ -177,8 +173,8 @@ public class ReplicationSource implements ReplicationSourceInterface { this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.totalBufferUsed = manager.getTotalBufferUsed(); this.walFileLengthProvider = walFileLengthProvider; - LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId - + ", currentBandwidth=" + this.currentBandwidth); + LOG.info("queueId={}, ReplicationSource : {}, currentBandwidth={}", queueId, + replicationPeer.getId(), this.currentBandwidth); } private void decorateConf() { @@ -215,6 +211,7 @@ public class ReplicationSource implements ReplicationSourceInterface { @Override public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) throws ReplicationException { + String peerId = replicationPeer.getId(); Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs(); if (tableCFMap != null) { List<String> tableCfs = tableCFMap.get(tableName); @@ -274,8 +271,8 @@ public class ReplicationSource implements ReplicationSourceInterface { tableDescriptors = ((HRegionServer) server).getTableDescriptors(); } replicationEndpoint - .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId, - clusterId, replicationPeer, metrics, tableDescriptors, server)); + .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, + replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server)); replicationEndpoint.start(); replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); } @@ -357,8 +354,8 @@ public class ReplicationSource implements ReplicationSourceInterface { if (peerBandwidth != currentBandwidth) { currentBandwidth = peerBandwidth; throttler.setBandwidth((double) currentBandwidth / 10.0); - LOG.info("ReplicationSource : " + peerId - + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth); + LOG.info("ReplicationSource : {} bandwidth throttling changed, currentBandWidth={}", + replicationPeer.getId(), currentBandwidth); } } @@ -387,15 +384,6 @@ public class ReplicationSource implements ReplicationSourceInterface { return sleepMultiplier < maxRetriesMultiplier; } - /** - * check whether the peer is enabled or not - * @return true if the peer is enabled, otherwise false - */ - @Override - public boolean isPeerEnabled() { - return replicationPeer.isPeerEnabled(); - } - private void initialize() { int sleepMultiplier = 1; while (this.isSourceActive()) { @@ -529,11 +517,6 @@ public class ReplicationSource implements ReplicationSourceInterface { } @Override - public String getPeerId() { - return this.peerId; - } - - @Override public Path getCurrentPath() { // only for testing for (ReplicationSourceShipper worker : workerThreads.values()) { @@ -616,6 +599,11 @@ public class ReplicationSource implements ReplicationSourceInterface { return server.getServerName(); } + @Override + public ReplicationPeer getPeer() { + return replicationPeer; + } + Server getServer() { return server; } @@ -623,4 +611,6 @@ public class ReplicationSource implements ReplicationSourceInterface { ReplicationQueueStorage getQueueStorage() { return queueStorage; } + + } http://git-wip-us.apache.org/repos/asf/hbase/blob/c3b02fbe/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 090b465..3ce5bfe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -104,10 +104,17 @@ public interface ReplicationSourceInterface { /** * Get the id that the source is replicating to. - * * @return peer id */ - String getPeerId(); + default String getPeerId() { + return getPeer().getId(); + } + + /** + * Get the replication peer instance. + * @return the replication peer instance + */ + ReplicationPeer getPeer(); /** * Get a string representation of the current statistics @@ -119,9 +126,17 @@ public interface ReplicationSourceInterface { /** * @return peer enabled or not */ - boolean isPeerEnabled(); + default boolean isPeerEnabled() { + return getPeer().isPeerEnabled(); + } /** + * @return whether this is sync replication peer. + */ + default boolean isSyncReplication() { + return getPeer().getPeerConfig().isSyncReplication(); + } + /** * @return active or not */ boolean isSourceActive(); http://git-wip-us.apache.org/repos/asf/hbase/blob/c3b02fbe/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 4212597..cbeba23 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationTracker; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -85,20 +87,20 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto * operations.</li> * <li>Need synchronized on {@link #walsById}. There are four methods which modify it, * {@link #addPeer(String)}, {@link #removePeer(String)}, - * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link #preLogRoll(Path)}. + * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and {@link #preLogRoll(Path)}. * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and - * {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is - * called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}. + * {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} + * is called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}. * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only - * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and + * case need synchronized is {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and * {@link #preLogRoll(Path)}.</li> * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which * modify it, {@link #removePeer(String)} , - * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and + * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and * {@link ReplicationSourceManager.NodeFailoverWorker#run()}. - * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by + * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the * {@link ReplicationSourceInterface} firstly, then remove the wals from * {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()} @@ -154,9 +156,15 @@ public class ReplicationSourceManager implements ReplicationListener { private final boolean replicationForBulkLoadDataEnabled; - private AtomicLong totalBufferUsed = new AtomicLong(); + // How long should we sleep for each retry when deleting remote wal files for sync replication + // peer. + private final long sleepForRetries; + // Maximum number of retries before taking bold actions when deleting remote wal files for sync + // replication peer. + private final int maxRetriesMultiplier; + /** * Creates a replication manager and sets the watch on all the other registered region servers * @param queueStorage the interface for manipulating replication queues @@ -204,8 +212,11 @@ public class ReplicationSourceManager implements ReplicationListener { tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); this.latestPaths = new HashSet<Path>(); - replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, - HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); + this.replicationForBulkLoadDataEnabled = conf.getBoolean( + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); + this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000); + this.maxRetriesMultiplier = + this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60); } /** @@ -494,16 +505,15 @@ public class ReplicationSourceManager implements ReplicationListener { /** * This method will log the current position to storage. And also clean old logs from the * replication queue. - * @param queueId id of the replication queue - * @param queueRecovered indicates if this queue comes from another region server + * @param source the replication source * @param entryBatch the wal entry batch we just shipped */ - public void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered, + public void logPositionAndCleanOldLogs(ReplicationSourceInterface source, WALEntryBatch entryBatch) { String fileName = entryBatch.getLastWalPath().getName(); - abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName, - entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds())); - cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered); + abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), + source.getQueueId(), fileName, entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds())); + cleanOldLogs(fileName, entryBatch.isEndOfFile(), source); } /** @@ -511,36 +521,84 @@ public class ReplicationSourceManager implements ReplicationListener { * file is closed and has no more entries. * @param log Path to the log * @param inclusive whether we should also remove the given log file - * @param queueId id of the replication queue - * @param queueRecovered Whether this is a recovered queue + * @param source the replication source */ @VisibleForTesting - void cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) { + void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) { String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log); - if (queueRecovered) { - NavigableSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix); + if (source.isRecovered()) { + NavigableSet<String> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix); if (wals != null) { - cleanOldLogs(wals, log, inclusive, queueId); + cleanOldLogs(wals, log, inclusive, source); } } else { // synchronized on walsById to avoid race with preLogRoll synchronized (this.walsById) { - NavigableSet<String> wals = walsById.get(queueId).get(logPrefix); + NavigableSet<String> wals = walsById.get(source.getQueueId()).get(logPrefix); if (wals != null) { - cleanOldLogs(wals, log, inclusive, queueId); + cleanOldLogs(wals, log, inclusive, source); + } + } + } + } + + private void removeRemoteWALs(String peerId, String remoteWALDir, Set<String> wals) + throws IOException { + Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId); + FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir); + for (String wal : wals) { + Path walFile = new Path(remoteWALDirForPeer, wal); + try { + if (!fs.delete(walFile, false) && fs.exists(walFile)) { + throw new IOException("Can not delete " + walFile); } + } catch (FileNotFoundException e) { + // Just ignore since this means the file has already been deleted. + // The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an + // inexistent file, so here we deal with both, i.e, check the return value of the + // FileSystem.delete, and also catch FNFE. + LOG.debug("The remote wal {} has already been deleted?", walFile, e); } } } - private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, String id) { + private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, + ReplicationSourceInterface source) { NavigableSet<String> walSet = wals.headSet(key, inclusive); if (walSet.isEmpty()) { return; } LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet); + // The intention here is that, we want to delete the remote wal files ASAP as it may effect the + // failover time if you want to transit the remote cluster from S to A. And the infinite retry + // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can + // not contact with the HBase cluster either, so the replication will be blocked either. + if (source.isSyncReplication()) { + String peerId = source.getPeerId(); + String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir(); + LOG.debug("Removing {} logs from remote dir {} in the list: {}", walSet.size(), remoteWALDir, + walSet); + for (int sleepMultiplier = 0;;) { + try { + removeRemoteWALs(peerId, remoteWALDir, walSet); + break; + } catch (IOException e) { + LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir, + peerId); + } + if (!source.isSourceActive()) { + // skip the following operations + return; + } + if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries, + sleepMultiplier, maxRetriesMultiplier)) { + sleepMultiplier++; + } + } + } + String queueId = source.getQueueId(); for (String wal : walSet) { - abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); + abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), queueId, wal)); } walSet.clear(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c3b02fbe/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 11fd660..3f97b5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries; + import java.io.IOException; import java.util.List; import java.util.concurrent.PriorityBlockingQueue; @@ -91,7 +93,7 @@ public class ReplicationSourceShipper extends Thread { if (!source.isPeerEnabled()) { // The peer enabled check is in memory, not expensive, so do not need to increase the // sleep interval as it may cause a long lag when we enable the peer. - sleepForRetries("Replication is disabled", 1); + sleepForRetries("Replication is disabled", sleepForRetries, 1, maxRetriesMultiplier); continue; } try { @@ -189,7 +191,8 @@ public class ReplicationSourceShipper extends Thread { } catch (Exception ex) { LOG.warn("{} threw unknown exception:", source.getReplicationEndpoint().getClass().getName(), ex); - if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { + if (sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier, + maxRetriesMultiplier)) { sleepMultiplier++; } } @@ -228,8 +231,7 @@ public class ReplicationSourceShipper extends Thread { // position and the file will be removed soon in cleanOldLogs. if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) || batch.getLastWalPosition() != currentPosition) { - source.getSourceManager().logPositionAndCleanOldLogs(source.getQueueId(), - source.isRecovered(), batch); + source.getSourceManager().logPositionAndCleanOldLogs(source, batch); updated = true; } // if end of file is true, then we can just skip to the next file in queue. @@ -282,21 +284,4 @@ public class ReplicationSourceShipper extends Thread { public boolean isFinished() { return state == WorkerState.FINISHED; } - - /** - * Do the sleeping logic - * @param msg Why we sleep - * @param sleepMultiplier by how many times the default sleeping time is augmented - * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> - */ - public boolean sleepForRetries(String msg, int sleepMultiplier) { - try { - LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier); - Thread.sleep(this.sleepForRetries * sleepMultiplier); - } catch (InterruptedException e) { - LOG.debug("Interrupted while sleeping between retries"); - Thread.currentThread().interrupt(); - } - return sleepMultiplier < maxRetriesMultiplier; - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c3b02fbe/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index 9cbb095..3cd356d42 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -33,11 +33,10 @@ import java.util.function.BiPredicate; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener; import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; @@ -118,10 +117,10 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen } private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException { - Path remoteWALDirPath = new Path(remoteWALDir); - FileSystem remoteFs = remoteWALDirPath.getFileSystem(conf); - return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), remoteFs, - CommonFSUtils.getWALRootDir(conf), new Path(remoteWALDirPath, peerId), + return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), + ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir), + CommonFSUtils.getWALRootDir(conf), + ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId), getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c3b02fbe/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index ec6ec96..67f793d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -40,12 +39,13 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; */ public class ReplicationSourceDummy implements ReplicationSourceInterface { - ReplicationSourceManager manager; - String peerClusterId; - Path currentPath; - MetricsSource metrics; - WALFileLengthProvider walFileLengthProvider; - AtomicBoolean startup = new AtomicBoolean(false); + private ReplicationSourceManager manager; + private ReplicationPeer replicationPeer; + private String peerClusterId; + private Path currentPath; + private MetricsSource metrics; + private WALFileLengthProvider walFileLengthProvider; + private AtomicBoolean startup = new AtomicBoolean(false); @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, @@ -56,6 +56,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { this.peerClusterId = peerClusterId; this.metrics = metrics; this.walFileLengthProvider = walFileLengthProvider; + this.replicationPeer = rp; } @Override @@ -153,4 +154,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { public ServerName getServerWALsBelongTo() { return null; } + + @Override + public ReplicationPeer getPeer() { + return replicationPeer; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c3b02fbe/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 482f49a..5ea3173 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.lang.reflect.Field; @@ -49,19 +51,19 @@ import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.replication.ReplicationFactory; @@ -71,6 +73,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; @@ -133,9 +136,9 @@ public abstract class TestReplicationSourceManager { protected static ZKWatcher zkw; - protected static HTableDescriptor htd; + protected static TableDescriptor htd; - protected static HRegionInfo hri; + protected static RegionInfo hri; protected static final byte[] r1 = Bytes.toBytes("r1"); @@ -156,6 +159,8 @@ public abstract class TestReplicationSourceManager { protected static Path logDir; + protected static Path remoteLogDir; + protected static CountDownLatch latch; protected static List<String> files = new ArrayList<>(); @@ -185,10 +190,9 @@ public abstract class TestReplicationSourceManager { ZKClusterId.setClusterId(zkw, new ClusterId()); FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir()); fs = FileSystem.get(conf); - oldLogDir = new Path(utility.getDataTestDir(), - HConstants.HREGION_OLDLOGDIR_NAME); - logDir = new Path(utility.getDataTestDir(), - HConstants.HREGION_LOGDIR_NAME); + oldLogDir = utility.getDataTestDir(HConstants.HREGION_OLDLOGDIR_NAME); + logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME); + remoteLogDir = utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME); replication = new Replication(); replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null); managerOfCluster = getManagerFromCluster(); @@ -205,19 +209,16 @@ public abstract class TestReplicationSourceManager { } waitPeer(slaveId, manager, true); - htd = new HTableDescriptor(test); - HColumnDescriptor col = new HColumnDescriptor(f1); - col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); - htd.addFamily(col); - col = new HColumnDescriptor(f2); - col.setScope(HConstants.REPLICATION_SCOPE_LOCAL); - htd.addFamily(col); + htd = TableDescriptorBuilder.newBuilder(test) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f2)).build(); scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for(byte[] fam : htd.getFamiliesKeys()) { + for(byte[] fam : htd.getColumnFamilyNames()) { scopes.put(fam, 0); } - hri = new HRegionInfo(htd.getTableName(), r1, r2); + hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(r1).setEndKey(r2).build(); } private static ReplicationSourceManager getManagerFromCluster() { @@ -248,6 +249,7 @@ public abstract class TestReplicationSourceManager { private void cleanLogDir() throws IOException { fs.delete(logDir, true); fs.delete(oldLogDir, true); + fs.delete(remoteLogDir, true); } @Before @@ -286,10 +288,10 @@ public abstract class TestReplicationSourceManager { .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); final WAL wal = wals.getWAL(hri); manager.init(); - HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame")); - htd.addFamily(new HColumnDescriptor(f1)); + TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("tableame")) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f1)).build(); NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for(byte[] fam : htd.getFamiliesKeys()) { + for(byte[] fam : htd.getColumnFamilyNames()) { scopes.put(fam, 0); } // Testing normal log rolling every 20 @@ -329,7 +331,11 @@ public abstract class TestReplicationSourceManager { wal.rollWriter(); - manager.logPositionAndCleanOldLogs("1", false, + ReplicationSourceInterface source = mock(ReplicationSourceInterface.class); + when(source.getQueueId()).thenReturn("1"); + when(source.isRecovered()).thenReturn(false); + when(source.isSyncReplication()).thenReturn(false); + manager.logPositionAndCleanOldLogs(source, new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath())); wal.append(hri, @@ -404,7 +410,11 @@ public abstract class TestReplicationSourceManager { assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); - manager.cleanOldLogs(file2, false, id, true); + ReplicationSourceInterface source = mock(ReplicationSourceInterface.class); + when(source.getQueueId()).thenReturn(id); + when(source.isRecovered()).thenReturn(true); + when(source.isSyncReplication()).thenReturn(false); + manager.cleanOldLogs(file2, false, source); // log1 should be deleted assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); } @@ -488,14 +498,13 @@ public abstract class TestReplicationSourceManager { * corresponding ReplicationSourceInterface correctly cleans up the corresponding * replication queue and ReplicationPeer. * See HBASE-16096. - * @throws Exception */ @Test public void testPeerRemovalCleanup() throws Exception{ String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); final String peerId = "FakePeer"; - final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() - .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase"); + final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase").build(); try { DummyServer server = new DummyServer(); ReplicationQueueStorage rq = ReplicationStorageFactory @@ -504,7 +513,7 @@ public abstract class TestReplicationSourceManager { // initialization to throw an exception. conf.set("replication.replicationsource.implementation", FailInitializeDummyReplicationSource.class.getName()); - final ReplicationPeers rp = manager.getReplicationPeers(); + manager.getReplicationPeers(); // Set up the znode and ReplicationPeer for the fake peer // Don't wait for replication source to initialize, we know it won't. addPeerAndWait(peerId, peerConfig, false); @@ -549,8 +558,8 @@ public abstract class TestReplicationSourceManager { @Test public void testRemovePeerMetricsCleanup() throws Exception { final String peerId = "DummyPeer"; - final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() - .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase"); + final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() + .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase").build(); try { MetricsReplicationSourceSource globalSource = getGlobalSource(); final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); @@ -582,6 +591,40 @@ public abstract class TestReplicationSourceManager { } } + @Test + public void testRemoveRemoteWALs() throws IOException { + // make sure that we can deal with files which does not exist + String walNameNotExists = "remoteWAL.0"; + Path wal = new Path(logDir, walNameNotExists); + manager.preLogRoll(wal); + manager.postLogRoll(wal); + + Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId); + fs.mkdirs(remoteLogDirForPeer); + String walName = "remoteWAL.1"; + Path remoteWAL = + new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory()); + fs.create(remoteWAL).close(); + wal = new Path(logDir, walName); + manager.preLogRoll(wal); + manager.postLogRoll(wal); + + ReplicationSourceInterface source = mock(ReplicationSourceInterface.class); + when(source.getPeerId()).thenReturn(slaveId); + when(source.getQueueId()).thenReturn(slaveId); + when(source.isRecovered()).thenReturn(false); + when(source.isSyncReplication()).thenReturn(true); + ReplicationPeerConfig config = mock(ReplicationPeerConfig.class); + when(config.getRemoteWALDir()) + .thenReturn(remoteLogDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString()); + ReplicationPeer peer = mock(ReplicationPeer.class); + when(peer.getPeerConfig()).thenReturn(config); + when(source.getPeer()).thenReturn(peer); + manager.cleanOldLogs(walName, true, source); + + assertFalse(fs.exists(remoteWAL)); + } + /** * Add a peer and wait for it to initialize * @param peerId