This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-27109/table_based_rqs in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 4f5db83ac61d561afa864f9709e17cfd07d61b6c Author: Duo Zhang <zhang...@apache.org> AuthorDate: Thu Sep 15 22:58:29 2022 +0800 HBASE-27215 Add support for sync replication (#4762) Signed-off-by: Xiaolin Ha <haxiao...@apache.org> --- .../regionserver/ReplicationSource.java | 2 +- .../regionserver/ReplicationSourceManager.java | 53 +++++++++++----------- .../TestDrainReplicationQueuesForStandBy.java | 3 -- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 45b66bcb1dc..788fb4871c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -465,7 +465,7 @@ public class ReplicationSource implements ReplicationSourceInterface { t.getName()); manager.refreshSources(peerId); break; - } catch (IOException e1) { + } catch (IOException | ReplicationException e1) { LOG.error("Replication sources refresh failed.", e1); sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 3397fffc036..c887988fd92 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -403,38 +403,44 @@ public class ReplicationSourceManager { // TODO: use empty initial offsets for now, revisit when adding support for sync replication ReplicationSourceInterface src = createSource(new ReplicationQueueData(queueId, ImmutableMap.of()), peer); - // synchronized here to avoid race with preLogRoll where we add new log to source and also + // synchronized here to avoid race with postLogRoll where we add new log to source and also // walsById. ReplicationSourceInterface toRemove; - Map<String, NavigableSet<String>> wals = new HashMap<>(); + ReplicationQueueData queueData; synchronized (latestPaths) { + // Here we make a copy of all the remaining wal files and then delete them from the + // replication queue storage after releasing the lock. It is not safe to just remove the old + // map from walsById since later we may fail to update the replication queue storage, and when + // we retry next time, we can not know the wal files that needs to be set to the replication + // queue storage + ImmutableMap.Builder<String, ReplicationGroupOffset> builder = ImmutableMap.builder(); + synchronized (walsById) { + walsById.get(queueId).forEach((group, wals) -> { + if (!wals.isEmpty()) { + builder.put(group, new ReplicationGroupOffset(wals.last(), -1)); + } + }); + } + queueData = new ReplicationQueueData(queueId, builder.build()); + src = createSource(queueData, peer); toRemove = sources.put(peerId, src); if (toRemove != null) { LOG.info("Terminate replication source for " + toRemove.getPeerId()); toRemove.terminate(terminateMessage); toRemove.getSourceMetrics().clear(); } - // Here we make a copy of all the remaining wal files and then delete them from the - // replication queue storage after releasing the lock. It is not safe to just remove the old - // map from walsById since later we may fail to delete them from the replication queue - // storage, and when we retry next time, we can not know the wal files that need to be deleted - // from the replication queue storage. - walsById.get(queueId).forEach((k, v) -> wals.put(k, new TreeSet<>(v))); + } + for (Map.Entry<String, ReplicationGroupOffset> entry : queueData.getOffsets().entrySet()) { + queueStorage.setOffset(queueId, entry.getKey(), entry.getValue(), Collections.emptyMap()); } LOG.info("Startup replication source for " + src.getPeerId()); src.startup(); - for (NavigableSet<String> walsByGroup : wals.values()) { - // TODO: just need to reset the replication offset - // for (String wal : walsByGroup) { - // queueStorage.removeWAL(server.getServerName(), peerId, wal); - // } - } synchronized (walsById) { - Map<String, NavigableSet<String>> oldWals = walsById.get(queueId); - wals.forEach((k, v) -> { - NavigableSet<String> walsByGroup = oldWals.get(k); + Map<String, NavigableSet<String>> wals = walsById.get(queueId); + queueData.getOffsets().forEach((group, offset) -> { + NavigableSet<String> walsByGroup = wals.get(group); if (walsByGroup != null) { - walsByGroup.removeAll(v); + walsByGroup.headSet(offset.getWal(), true).clear(); } }); } @@ -457,13 +463,8 @@ public class ReplicationSourceManager { } private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId queueId, - ReplicationPeer peer) throws IOException { - Map<String, ReplicationGroupOffset> offsets; - try { - offsets = queueStorage.getOffsets(queueId); - } catch (ReplicationException e) { - throw new IOException(e); - } + ReplicationPeer peer) throws IOException, ReplicationException { + Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId); return createSource(new ReplicationQueueData(queueId, ImmutableMap.copyOf(offsets)), peer); } @@ -473,7 +474,7 @@ public class ReplicationSourceManager { * replication queue storage and only to enqueue all logs to the new replication source * @param peerId the id of the replication peer */ - public void refreshSources(String peerId) throws IOException { + public void refreshSources(String peerId) throws ReplicationException, IOException { String terminateMessage = "Peer " + peerId + " state or config changed. Will close the previous replication source and open a new one"; ReplicationPeer peer = replicationPeers.getPeer(peerId); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java index 8918f8422e1..0189d475575 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java @@ -35,12 +35,9 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -// TODO: revisit later -@Ignore @Category({ ReplicationTests.class, MediumTests.class }) public class TestDrainReplicationQueuesForStandBy extends SyncReplicationTestBase {