This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-27109/table_based_rqs in repository https://gitbox.apache.org/repos/asf/hbase.git
commit bc5eb2269aefeb337aecd6933d3486509f25f08a Author: Duo Zhang <zhang...@apache.org> AuthorDate: Sat Aug 20 23:10:58 2022 +0800 HBASE-27213 Add support for claim queue operation (#4708) Signed-off-by: Xin Sun <ddu...@gmail.com> --- .../protobuf/server/master/MasterProcedure.proto | 6 +-- .../AssignReplicationQueuesProcedure.java | 13 ++--- .../master/replication/ModifyPeerProcedure.java | 2 +- .../master/replication/RemovePeerProcedure.java | 41 +++++++++++++- .../regionserver/ReplicationSourceManager.java | 37 +++++++++---- .../replication/TestClaimReplicationQueue.java | 2 +- ...java => TestRemovePeerProcedureWaitForSCP.java} | 63 +++++++++++++--------- .../replication/TestSerialReplicationFailover.java | 3 -- 8 files changed, 116 insertions(+), 51 deletions(-) diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index 2e0da0deb84..76a1d676487 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -515,6 +515,7 @@ message UpdatePeerConfigStateData { message RemovePeerStateData { optional ReplicationPeer peer_config = 1; + repeated int64 ongoing_assign_replication_queues_proc_ids = 2; } message EnablePeerStateData { @@ -714,9 +715,8 @@ message ModifyColumnFamilyStoreFileTrackerStateData { } enum AssignReplicationQueuesState { - ASSIGN_REPLICATION_QUEUES_PRE_CHECK = 1; - ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 2; - ASSIGN_REPLICATION_QUEUES_CLAIM = 3; + ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 1; + ASSIGN_REPLICATION_QUEUES_CLAIM = 2; } message AssignReplicationQueuesStateData { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java index e7fb5e51715..d33259dd436 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; @@ -102,8 +103,12 @@ public class AssignReplicationQueuesProcedure } private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException { + Set<String> existingPeerIds = env.getReplicationPeerManager().listPeers(null).stream() + .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toSet()); ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage(); - List<ReplicationQueueId> queueIds = storage.listAllQueueIds(crashedServer); + // filter out replication queue for deleted peers + List<ReplicationQueueId> queueIds = storage.listAllQueueIds(crashedServer).stream() + .filter(q -> existingPeerIds.contains(q.getPeerId())).collect(Collectors.toList()); if (queueIds.isEmpty()) { LOG.debug("Finish claiming replication queues for {}", crashedServer); // we are done @@ -130,10 +135,6 @@ public class AssignReplicationQueuesProcedure throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { try { switch (state) { - case ASSIGN_REPLICATION_QUEUES_PRE_CHECK: - // TODO: reserved for implementing the fencing logic with Add/Remove/UpdatePeerProcedure - setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES); - return Flow.HAS_MORE_STATE; case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES: addMissingQueues(env); retryCounter = null; @@ -183,7 +184,7 @@ public class AssignReplicationQueuesProcedure @Override protected AssignReplicationQueuesState getInitialState() { - return AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_PRE_CHECK; + return AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index 67d70a166be..78b97620c01 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -74,7 +74,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi * update the peer storage. */ protected abstract void postPeerModification(MasterProcedureEnv env) - throws IOException, ReplicationException; + throws IOException, ReplicationException, ProcedureSuspendedException; protected void releaseLatch(MasterProcedureEnv env) { ProcedurePrepareLatch.releaseLatch(latch, this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java index 2042e846849..2fadc3fd664 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java @@ -18,10 +18,17 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.yetus.audience.InterfaceAudience; @@ -40,6 +47,8 @@ public class RemovePeerProcedure extends ModifyPeerProcedure { private ReplicationPeerConfig peerConfig; + private List<Long> ongoingAssignReplicationQueuesProcIds = Collections.emptyList(); + public RemovePeerProcedure() { } @@ -64,15 +73,43 @@ public class RemovePeerProcedure extends ModifyPeerProcedure { @Override protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { env.getReplicationPeerManager().removePeer(peerId); + // record ongoing AssignReplicationQueuesProcedures after we update the peer storage + ongoingAssignReplicationQueuesProcIds = env.getMasterServices().getMasterProcedureExecutor() + .getProcedures().stream().filter(p -> p instanceof AssignReplicationQueuesProcedure) + .filter(p -> !p.isFinished()).map(Procedure::getProcId).collect(Collectors.toList()); } private void removeRemoteWALs(MasterProcedureEnv env) throws IOException { env.getMasterServices().getSyncReplicationReplayWALManager().removePeerRemoteWALs(peerId); } + private void checkAssignReplicationQueuesFinished(MasterProcedureEnv env) + throws ProcedureSuspendedException { + if (ongoingAssignReplicationQueuesProcIds.isEmpty()) { + LOG.info("No ongoing assign replication queues procedures when removing peer {}, move on", + peerId); + } + ProcedureExecutor<MasterProcedureEnv> procExec = + env.getMasterServices().getMasterProcedureExecutor(); + long[] unfinishedProcIds = + ongoingAssignReplicationQueuesProcIds.stream().map(procExec::getProcedure) + .filter(p -> p != null && !p.isFinished()).mapToLong(Procedure::getProcId).toArray(); + if (unfinishedProcIds.length == 0) { + LOG.info( + "All assign replication queues procedures are finished when removing peer {}, move on", + peerId); + } else { + throw suspend(env.getMasterConfiguration(), backoff -> LOG.info( + "There are still {} pending assign replication queues procedures {} when removing peer {}, sleep {} secs", + unfinishedProcIds.length, Arrays.toString(unfinishedProcIds), peerId, backoff / 1000)); + } + } + @Override protected void postPeerModification(MasterProcedureEnv env) - throws IOException, ReplicationException { + throws IOException, ReplicationException, ProcedureSuspendedException { + checkAssignReplicationQueuesFinished(env); + if (peerConfig.isSyncReplication()) { removeRemoteWALs(env); } @@ -94,6 +131,7 @@ public class RemovePeerProcedure extends ModifyPeerProcedure { if (peerConfig != null) { builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)); } + builder.addAllOngoingAssignReplicationQueuesProcIds(ongoingAssignReplicationQueuesProcIds); serializer.serialize(builder.build()); } @@ -104,5 +142,6 @@ public class RemovePeerProcedure extends ModifyPeerProcedure { if (data.hasPeerConfig()) { this.peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig()); } + ongoingAssignReplicationQueuesProcIds = data.getOngoingAssignReplicationQueuesProcIdsList(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index b024392dbd5..4760bad5d23 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -236,7 +236,7 @@ public class ReplicationSourceManager { */ void init() throws IOException { for (String id : this.replicationPeers.getAllPeerIds()) { - addSource(id); + addSource(id, true); } } @@ -256,7 +256,7 @@ public class ReplicationSourceManager { throw new IOException(e); } if (added) { - addSource(peerId); + addSource(peerId, false); } } @@ -322,11 +322,16 @@ public class ReplicationSourceManager { /** * Add a normal source for the given peer on this region server. Meanwhile, add new replication * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal - * group and do replication + * group and do replication. + * <p/> + * We add a {@code init} parameter to indicate whether this is part of the initialization process. + * If so, we should skip adding the replication queues as this may introduce dead lock on region + * server start up and hbase:replication table online. * @param peerId the id of the replication peer + * @param init whether this call is part of the initialization process * @return the source that was created */ - void addSource(String peerId) throws IOException { + void addSource(String peerId, boolean init) throws IOException { ReplicationPeer peer = replicationPeers.getPeer(peerId); if ( ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME @@ -351,11 +356,16 @@ public class ReplicationSourceManager { NavigableSet<String> wals = new TreeSet<>(); wals.add(walPath.getName()); walsByGroup.put(walPrefixAndPath.getKey(), wals); - // Abort RS and throw exception to make add peer failed - // TODO: can record the length of the current wal file so we could replicate less data - abortAndThrowIOExceptionWhenFail( - () -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(), - new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap())); + if (!init) { + // Abort RS and throw exception to make add peer failed + // Ideally we'd better use the current file size as offset so we can skip replicating + // the data before adding replication peer, but the problem is that the file may not end + // at a valid entry's ending, and the current WAL Reader implementation can not deal + // with reading from the middle of a WAL entry. Can improve later. + abortAndThrowIOExceptionWhenFail( + () -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(), + new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap())); + } src.enqueueLog(walPath); LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId()); } @@ -793,9 +803,15 @@ public class ReplicationSourceManager { * @return {@code true} means we should replicate the given {@code wal}, otherwise {@code false}. */ private boolean shouldReplicate(ReplicationGroupOffset offset, String wal) { - if (offset == null || offset == ReplicationGroupOffset.BEGIN) { + // skip replicating meta wals + if (AbstractFSWALProvider.isMetaFile(wal)) { return false; } + // if no offset or the offset is just a place marker, replicate + if (offset == null || offset == ReplicationGroupOffset.BEGIN) { + return true; + } + // otherwise, compare the timestamp long walTs = AbstractFSWALProvider.getTimestamp(wal); long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal()); if (walTs < startWalTs) { @@ -890,7 +906,6 @@ public class ReplicationSourceManager { LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(), groupOffset); } - walFilesPQ.add(file); } // the method is a bit long, so assign it to null here to avoid later we reuse it again by // mistake, we should use the sorted walFilesPQ instead diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java index a12081a7636..de226b13e8f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java @@ -156,7 +156,7 @@ public class TestClaimReplicationQueue extends TestReplicationBase { hbaseAdmin.enableReplicationPeer(PEER_ID3); EMPTY = false; - // wait until the SCP finished, ClaimReplicationQueuesProcedure is a sub procedure of SCP + // wait until the SCP finished, AssignReplicationQueuesProcedure is a sub procedure of SCP UTIL1.waitFor(30000, () -> master.getProcedures().stream() .filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemovePeerProcedureWaitForSCP.java similarity index 73% copy from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java copy to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemovePeerProcedureWaitForSCP.java index a12081a7636..e93fa3b01e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemovePeerProcedureWaitForSCP.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.replication; +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.assertEquals; + import java.io.IOException; import java.util.Collections; import java.util.List; @@ -32,6 +36,7 @@ import org.apache.hadoop.hbase.master.RegionServerList; import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; import org.apache.hadoop.hbase.master.replication.AssignReplicationQueuesProcedure; +import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -43,18 +48,20 @@ import org.junit.experimental.categories.Category; import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; /** - * In HBASE-26029, we reimplement the claim queue operation with proc-v2 and make it a step in SCP, - * this is a UT to make sure the {@link AssignReplicationQueuesProcedure} works correctly. + * Make sure we will wait until all the SCPs finished in RemovePeerProcedure. + * <p/> + * See HBASE-27109 for more details. */ @Category({ ReplicationTests.class, LargeTests.class }) -public class TestClaimReplicationQueue extends TestReplicationBase { +public class TestRemovePeerProcedureWaitForSCP extends TestReplicationBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestClaimReplicationQueue.class); + HBaseClassTestRule.forClass(TestRemovePeerProcedureWaitForSCP.class); private static final TableName tableName3 = TableName.valueOf("test3"); @@ -62,8 +69,6 @@ public class TestClaimReplicationQueue extends TestReplicationBase { private static Table table3; - private static Table table4; - private static volatile boolean EMPTY = false; public static final class ServerManagerForTest extends ServerManager { @@ -106,14 +111,6 @@ public class TestClaimReplicationQueue extends TestReplicationBase { TestReplicationBase.setUpBeforeClass(); createTable(tableName3); table3 = connection1.getTable(tableName3); - table4 = connection2.getTable(tableName3); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - Closeables.close(table3, true); - Closeables.close(table4, true); - TestReplicationBase.tearDownAfterClass(); } @Override @@ -130,15 +127,21 @@ public class TestClaimReplicationQueue extends TestReplicationBase { removePeer(PEER_ID3); } + @AfterClass + public static void tearDownAfterClass() throws Exception { + Closeables.close(table3, true); + TestReplicationBase.tearDownAfterClass(); + } + @Test - public void testClaim() throws Exception { + public void testWait() throws Exception { // disable the peers hbaseAdmin.disableReplicationPeer(PEER_ID2); hbaseAdmin.disableReplicationPeer(PEER_ID3); // put some data - int count1 = UTIL1.loadTable(htable1, famName); - int count2 = UTIL1.loadTable(table3, famName); + UTIL1.loadTable(htable1, famName); + UTIL1.loadTable(table3, famName); EMPTY = true; UTIL1.getMiniHBaseCluster().stopRegionServer(0).join(); @@ -152,16 +155,26 @@ public class TestClaimReplicationQueue extends TestReplicationBase { .filter(p -> p instanceof AssignReplicationQueuesProcedure) .anyMatch(p -> p.getState() == ProcedureState.WAITING_TIMEOUT)); - hbaseAdmin.enableReplicationPeer(PEER_ID2); - hbaseAdmin.enableReplicationPeer(PEER_ID3); - + // call remove replication peer, and make sure it will be stuck in the POST_PEER_MODIFICATION + // state. + hbaseAdmin.removeReplicationPeerAsync(PEER_ID3); + UTIL1.waitFor(30000, + () -> master.getProcedures().stream().filter(p -> p instanceof RemovePeerProcedure) + .anyMatch(p -> ((RemovePeerProcedure) p).getCurrentStateId() + == PeerModificationState.POST_PEER_MODIFICATION_VALUE)); + Thread.sleep(5000); + assertEquals(PeerModificationState.POST_PEER_MODIFICATION_VALUE, + ((RemovePeerProcedure) master.getProcedures().stream() + .filter(p -> p instanceof RemovePeerProcedure).findFirst().get()).getCurrentStateId()); EMPTY = false; - // wait until the SCP finished, ClaimReplicationQueuesProcedure is a sub procedure of SCP + // wait until the SCP finished, AssignReplicationQueuesProcedure is a sub procedure of SCP UTIL1.waitFor(30000, () -> master.getProcedures().stream() .filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess)); - - // we should get all the data in the target cluster - waitForReplication(htable2, count1, NB_RETRIES); - waitForReplication(table4, count2, NB_RETRIES); + // the RemovePeerProcedure should have also finished + UTIL1.waitFor(30000, () -> master.getProcedures().stream() + .filter(p -> p instanceof RemovePeerProcedure).allMatch(Procedure::isSuccess)); + // make sure there is no remaining replication queues for PEER_ID3 + assertThat(master.getReplicationPeerManager().getQueueStorage().listAllQueueIds(PEER_ID3), + empty()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationFailover.java index 6906db4cd46..1295ea14abc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationFailover.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationFailover.java @@ -32,12 +32,9 @@ import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -// revisit later when we reviewing the implementation for serial replication -@Ignore @Category({ ReplicationTests.class, MediumTests.class }) public class TestSerialReplicationFailover extends SerialReplicationTestBase {