HBASE-20456 Support removing a ReplicationSourceShipper for a special wal group
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/75046eeb Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/75046eeb Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/75046eeb Branch: refs/heads/HBASE-19064 Commit: 75046eebf6d8bfaabdbdb8c2f16f8930ce892441 Parents: b564486 Author: zhangduo <zhang...@apache.org> Authored: Tue Apr 24 22:01:21 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri May 4 17:54:52 2018 +0800 ---------------------------------------------------------------------- .../hbase/regionserver/wal/AsyncFSWAL.java | 1 + .../RecoveredReplicationSource.java | 13 +--- .../RecoveredReplicationSourceShipper.java | 7 -- .../regionserver/ReplicationSource.java | 13 +++- .../regionserver/ReplicationSourceManager.java | 19 ++++- .../regionserver/ReplicationSourceShipper.java | 20 +++-- .../ReplicationSourceWALReader.java | 9 ++- .../regionserver/WALEntryStream.java | 3 +- .../hadoop/hbase/wal/AbstractFSWALProvider.java | 28 ++++--- .../hbase/wal/SyncReplicationWALProvider.java | 10 ++- .../TestReplicationSourceManager.java | 5 +- .../TestSyncReplicationShipperQuit.java | 81 ++++++++++++++++++++ .../regionserver/TestWALEntryStream.java | 4 +- 13 files changed, 163 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 17133ed..f630e63 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -682,6 +682,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { protected void doShutdown() throws IOException { waitForSafePoint(); closeWriter(this.writer); + this.writer = null; closeExecutor.shutdown(); try { if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index a21ca44..f1bb538 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -144,15 +143,9 @@ public class RecoveredReplicationSource extends ReplicationSource { } void tryFinish() { - // use synchronize to make sure one last thread will clean the queue - synchronized (workerThreads) { - Threads.sleep(100);// wait a short while for other worker thread to fully exit - boolean allTasksDone = workerThreads.values().stream().allMatch(w -> w.isFinished()); - if (allTasksDone) { - this.getSourceMetrics().clear(); - manager.removeRecoveredSource(this); - LOG.info("Finished recovering queue {} with the following stats: {}", queueId, getStats()); - } + if (workerThreads.isEmpty()) { + this.getSourceMetrics().clear(); + manager.finishRecoveredSource(this); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index 91109cf..b0d4db0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -48,13 +48,6 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper } @Override - protected void noMoreData() { - LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, source.getQueueId()); - source.getSourceMetrics().incrCompletedRecoveryQueue(); - setWorkerState(WorkerState.FINISHED); - } - - @Override protected void postFinish() { source.tryFinish(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 01ccb11..1a27fc1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -62,6 +62,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; /** @@ -122,6 +123,7 @@ public class ReplicationSource implements ReplicationSourceInterface { private long defaultBandwidth; private long currentBandwidth; private WALFileLengthProvider walFileLengthProvider; + @VisibleForTesting protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads = new ConcurrentHashMap<>(); @@ -192,6 +194,9 @@ public class ReplicationSource implements ReplicationSourceInterface { PriorityBlockingQueue<Path> queue = queues.get(logPrefix); if (queue == null) { queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); + // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise + // the shipper may quit immediately + queue.put(log); queues.put(logPrefix, queue); if (this.isSourceActive() && this.replicationEndpoint != null) { // new wal group observed after source startup, start a new worker thread to track it @@ -199,8 +204,10 @@ public class ReplicationSource implements ReplicationSourceInterface { // still not launched, so it's necessary to check workerThreads before start the worker tryStartNewShipper(logPrefix, queue); } + } else { + queue.put(log); } - queue.put(log); + this.metrics.incrSizeOfLogQueue(); // This will log a warning for each new log that gets created above the warn threshold int queueSize = queue.size(); @@ -611,5 +618,7 @@ public class ReplicationSource implements ReplicationSourceInterface { return queueStorage; } - + void removeWorker(ReplicationSourceShipper worker) { + workerThreads.remove(worker.walGroupId, worker); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index cbeba23..2d0d82b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -443,12 +443,25 @@ public class ReplicationSourceManager implements ReplicationListener { * Clear the metrics and related replication queue of the specified old source * @param src source to clear */ - void removeRecoveredSource(ReplicationSourceInterface src) { - LOG.info("Done with the recovered queue " + src.getQueueId()); - this.oldsources.remove(src); + private boolean removeRecoveredSource(ReplicationSourceInterface src) { + if (!this.oldsources.remove(src)) { + return false; + } + LOG.info("Done with the recovered queue {}", src.getQueueId()); // Delete queue from storage and memory deleteQueue(src.getQueueId()); this.walsByIdRecoveredQueues.remove(src.getQueueId()); + return true; + } + + void finishRecoveredSource(ReplicationSourceInterface src) { + synchronized (oldsources) { + if (!removeRecoveredSource(src)) { + return; + } + } + LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(), + src.getStats()); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 3f97b5e..b1361fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -50,13 +50,13 @@ public class ReplicationSourceShipper extends Thread { public enum WorkerState { RUNNING, STOPPED, - FINISHED, // The worker is done processing a recovered queue + FINISHED, // The worker is done processing a queue } private final Configuration conf; protected final String walGroupId; protected final PriorityBlockingQueue<Path> queue; - private final ReplicationSourceInterface source; + private final ReplicationSource source; // Last position in the log that we sent to ZooKeeper // It will be accessed by the stats thread so make it volatile @@ -73,7 +73,7 @@ public class ReplicationSourceShipper extends Thread { protected final int maxRetriesMultiplier; public ReplicationSourceShipper(Configuration conf, String walGroupId, - PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) { + PriorityBlockingQueue<Path> queue, ReplicationSource source) { this.conf = conf; this.walGroupId = walGroupId; this.queue = queue; @@ -98,7 +98,7 @@ public class ReplicationSourceShipper extends Thread { } try { WALEntryBatch entryBatch = entryReader.take(); - // the NO_MORE_DATA instance has no path so do not all shipEdits + // the NO_MORE_DATA instance has no path so do not call shipEdits if (entryBatch == WALEntryBatch.NO_MORE_DATA) { noMoreData(); } else { @@ -113,12 +113,20 @@ public class ReplicationSourceShipper extends Thread { if (!isFinished()) { setWorkerState(WorkerState.STOPPED); } else { + source.removeWorker(this); postFinish(); } } - // To be implemented by recovered shipper - protected void noMoreData() { + private void noMoreData() { + if (source.isRecovered()) { + LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, + source.getQueueId()); + source.getSourceMetrics().incrCompletedRecoveryQueue(); + } else { + LOG.debug("Finished queue for group {} of peer {}", walGroupId, source.getQueueId()); + } + setWorkerState(WorkerState.FINISHED); } // To be implemented by recovered shipper http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 64fd48d..61ab7c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -142,7 +142,7 @@ class ReplicationSourceWALReader extends Thread { entryBatchQueue.put(batch); sleepMultiplier = 1; } else { // got no entries and didn't advance position in WAL - handleEmptyWALEntryBatch(entryStream.getCurrentPath()); + handleEmptyWALEntryBatch(); entryStream.reset(); // reuse stream } } @@ -224,10 +224,11 @@ class ReplicationSourceWALReader extends Thread { return batch; } - private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { + private void handleEmptyWALEntryBatch() throws InterruptedException { LOG.trace("Didn't read any new entries from WAL"); - if (source.isRecovered()) { - // we're done with queue recovery, shut ourself down + if (logQueue.isEmpty()) { + // we're done with current queue, either this is a recovered queue, or it is the special group + // for a sync replication peer and the peer has been transited to DA or S state. setReaderRunning(false); // shuts down shipper thread immediately entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA); http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index b2c199e..0393af4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -304,7 +304,8 @@ class WALEntryStream implements Closeable { return true; } } else { - // no more files in queue, this could only happen for recovered queue. + // no more files in queue, this could happen for recovered queue, or for a wal group of a sync + // replication peer which has already been transited to DA or S. setCurrentPath(null); } return false; http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 5a3fba3..e528624 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; @@ -247,26 +248,30 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen if (walName == null) { throw new IllegalArgumentException("The WAL path couldn't be null"); } - final String[] walPathStrs = walName.toString().split("\\" + WAL_FILE_NAME_DELIMITER); - return Long.parseLong(walPathStrs[walPathStrs.length - (isMetaFile(walName) ? 2 : 1)]); + Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(walName.getName()); + if (matcher.matches()) { + return Long.parseLong(matcher.group(2)); + } else { + throw new IllegalArgumentException(walName.getName() + " is not a valid wal file name"); + } } /** * Pattern used to validate a WAL file name see {@link #validateWALFilename(String)} for * description. */ - private static final Pattern pattern = - Pattern.compile(".*\\.\\d*(" + META_WAL_PROVIDER_ID + ")*"); + private static final Pattern WAL_FILE_NAME_PATTERN = + Pattern.compile("(.+)\\.(\\d+)(\\.[0-9A-Za-z]+)?"); /** * A WAL file name is of the format: <wal-name>{@link #WAL_FILE_NAME_DELIMITER} - * <file-creation-timestamp>[.meta]. provider-name is usually made up of a server-name and a - * provider-id + * <file-creation-timestamp>[.<suffix>]. provider-name is usually made up of a + * server-name and a provider-id * @param filename name of the file to validate * @return <tt>true</tt> if the filename matches an WAL, <tt>false</tt> otherwise */ public static boolean validateWALFilename(String filename) { - return pattern.matcher(filename).matches(); + return WAL_FILE_NAME_PATTERN.matcher(filename).matches(); } /** @@ -517,10 +522,15 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen * log_prefix.filenumber.log_suffix * @param name Name of the WAL to parse * @return prefix of the log + * @throws IllegalArgumentException if the name passed in is not a valid wal file name * @see AbstractFSWAL#getCurrentFileName() */ public static String getWALPrefixFromWALName(String name) { - int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf("."); - return name.substring(0, endIndex); + Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name); + if (matcher.matches()) { + return matcher.group(1); + } else { + throw new IllegalArgumentException(name + " is not a valid wal file name"); + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index 3b56aa2..8faccd7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.wal; -import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDirectoryName; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName; @@ -42,6 +41,7 @@ import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener; import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @@ -113,8 +113,12 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen channelClass = eventLoopGroupAndChannelClass.getSecond(); } + // Use a timestamp to make it identical. That means, after we transit the peer to DA/S and then + // back to A, the log prefix will be changed. This is used to simplify the implementation for + // replication source, where we do not need to consider that a terminated shipper could be added + // back. private String getLogPrefix(String peerId) { - return factory.factoryId + WAL_FILE_NAME_DELIMITER + peerId; + return factory.factoryId + "-" + EnvironmentEdgeManager.currentTime() + "-" + peerId; } private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException { @@ -250,7 +254,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen @Override public void peerSyncReplicationStateChange(String peerId, SyncReplicationState from, SyncReplicationState to, int stage) { - if (from == SyncReplicationState.ACTIVE && to == SyncReplicationState.DOWNGRADE_ACTIVE) { + if (from == SyncReplicationState.ACTIVE) { if (stage == 0) { Lock lock = createLock.acquireLock(peerId); try { http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 5ea3173..cff8ceb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Pair; @@ -393,8 +394,8 @@ public abstract class TestReplicationSourceManager { // populate some znodes in the peer znode SortedSet<String> files = new TreeSet<>(); String group = "testgroup"; - String file1 = group + ".log1"; - String file2 = group + ".log2"; + String file1 = group + "." + EnvironmentEdgeManager.currentTime() + ".log1"; + String file2 = group + "." + EnvironmentEdgeManager.currentTime() + ".log2"; files.add(file1); files.add(file2); for (String file : files) { http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSyncReplicationShipperQuit.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSyncReplicationShipperQuit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSyncReplicationShipperQuit.java new file mode 100644 index 0000000..f6dc3d7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSyncReplicationShipperQuit.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.SyncReplicationTestBase; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Testcase for HBASE-20456. + */ +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSyncReplicationShipperQuit extends SyncReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSyncReplicationShipperQuit.class); + + @Test + public void testShipperQuitWhenDA() throws Exception { + // set to serial replication + UTIL1.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig + .newBuilder(UTIL1.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build()); + UTIL2.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig + .newBuilder(UTIL2.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build()); + UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.STANDBY); + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.ACTIVE); + + writeAndVerifyReplication(UTIL1, UTIL2, 0, 100); + HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME); + DualAsyncFSWAL wal = + (DualAsyncFSWAL) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build()); + String walGroupId = + AbstractFSWALProvider.getWALPrefixFromWALName(wal.getCurrentFileName().getName()); + ReplicationSourceShipper shipper = + ((ReplicationSource) ((Replication) rs.getReplicationSourceService()).getReplicationManager() + .getSource(PEER_ID)).workerThreads.get(walGroupId); + assertFalse(shipper.isFinished()); + + UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, + SyncReplicationState.DOWNGRADE_ACTIVE); + writeAndVerifyReplication(UTIL1, UTIL2, 100, 200); + + ReplicationSource source = (ReplicationSource) ((Replication) rs.getReplicationSourceService()) + .getReplicationManager().getSource(PEER_ID); + // the peer is serial so here we can make sure that the previous wals have already been + // replicated, and finally the shipper should be removed from the worker pool + UTIL1.waitFor(10000, () -> !source.workerThreads.containsKey(walGroupId)); + assertTrue(shipper.isFinished()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/75046eeb/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 35e4f82..fac6f74 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -413,9 +413,7 @@ public class TestWALEntryStream { batch = reader.take(); assertEquals(walPath, batch.getLastWalPath()); assertEquals(5, batch.getNbEntries()); - // Actually this should be true but we haven't handled this yet since for a normal queue the - // last one is always open... Not a big deal for now. - assertFalse(batch.isEndOfFile()); + assertTrue(batch.isEndOfFile()); assertSame(WALEntryBatch.NO_MORE_DATA, reader.take()); }