This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new f2caf16b080 HBASE-29463 Bidirectional serial replication will block if
a region’s last edit before rs crashed was from the peer cluster (#7172) (#7225)
f2caf16b080 is described below
commit f2caf16b080a57381fc52d8e7746b4123a986d3b
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Aug 14 15:28:30 2025 +0800
HBASE-29463 Bidirectional serial replication will block if a region’s last
edit before rs crashed was from the peer cluster (#7172) (#7225)
(cherry picked from commit bea4272960e0c3a92eaf436ec6cc1c5a2527ffc3)
Signed-off-by: Nick Dimiduk <[email protected]>
---
.../hbase/replication/ChainWALEntryFilter.java | 7 ++
.../replication/ClusterMarkingEntryFilter.java | 4 +-
.../hbase/replication/ScopeWALEntryFilter.java | 16 +++--
.../hadoop/hbase/replication/WALEntryFilter.java | 14 ++++
.../hbase/replication/WALEntryFilterBase.java | 66 ++++++++++++++++++
.../regionserver/ReplicationSource.java | 1 +
.../SerialReplicationSourceWALReader.java | 16 +++--
.../TestBidirectionSerialReplicationStuck.java | 79 ++++++++++++++++++++++
.../hbase/replication/TestReplicationBase.java | 31 ++++++---
.../TestReplicationWALEntryFilters.java | 11 ++-
10 files changed, 218 insertions(+), 27 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
index aa84f4705b0..9683472f3ba 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
@@ -53,6 +53,13 @@ public class ChainWALEntryFilter implements WALEntryFilter {
initCellFilters();
}
+ @Override
+ public void setSerial(boolean serial) {
+ for (WALEntryFilter filter : filters) {
+ filter.setSerial(serial);
+ }
+ }
+
public void initCellFilters() {
ArrayList<WALCellFilter> cellFilters = new ArrayList<>(filters.length);
for (WALEntryFilter filter : filters) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
index e05e79eab5a..041b9798857 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
@@ -31,7 +31,7 @@ import org.apache.yetus.audience.InterfaceStability;
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
@InterfaceStability.Evolving
-public class ClusterMarkingEntryFilter implements WALEntryFilter {
+public class ClusterMarkingEntryFilter extends WALEntryFilterBase {
private UUID clusterId;
private UUID peerClusterId;
private ReplicationEndpoint replicationEndpoint;
@@ -64,6 +64,6 @@ public class ClusterMarkingEntryFilter implements
WALEntryFilter {
return entry;
}
}
- return null;
+ return clearOrNull(entry);
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
index 6dc41bcc014..1429379deb3 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -26,24 +26,26 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
/**
* Keeps KVs that are scoped other than local
*/
@InterfaceAudience.Private
-public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter {
+public class ScopeWALEntryFilter extends WALEntryFilterBase implements
WALCellFilter {
private final BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
@Override
public Entry filter(Entry entry) {
- // Do not filter out an entire entry by replication scopes. As now we
support serial
- // replication, the sequence id of a marker is also needed by upper layer.
We will filter out
- // all the cells in the filterCell method below if the replication scopes
is null or empty.
- return entry;
+ NavigableMap<byte[], Integer> scopes =
entry.getKey().getReplicationScopes();
+ if (MapUtils.isNotEmpty(scopes)) {
+ return entry;
+ }
+ return clearOrNull(entry);
}
- private boolean hasGlobalScope(NavigableMap<byte[], Integer> scopes, byte[]
family) {
+ private static boolean hasGlobalScope(NavigableMap<byte[], Integer> scopes,
byte[] family) {
Integer scope = scopes.get(family);
return scope != null && scope.intValue() ==
HConstants.REPLICATION_SCOPE_GLOBAL;
}
@@ -51,7 +53,7 @@ public class ScopeWALEntryFilter implements WALEntryFilter,
WALCellFilter {
@Override
public Cell filterCell(Entry entry, Cell cell) {
NavigableMap<byte[], Integer> scopes =
entry.getKey().getReplicationScopes();
- if (scopes == null || scopes.isEmpty()) {
+ if (MapUtils.isEmpty(scopes)) {
return null;
}
byte[] family = CellUtil.cloneFamily(cell);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
index 8aa60f74ebb..d77fedd94bc 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
@@ -50,4 +50,18 @@ public interface WALEntryFilter {
* the entry to be skipped for replication.
*/
Entry filter(Entry entry);
+
+ /**
+ * Tell the filter whether the peer is a serial replication peer.
+ * <p>
+ * For serial replication, usually you should not filter out an entire
entry, unless the peer
+ * config does not contain the table, because we need the region name and
sequence id of the entry
+ * to advance the pushed sequence id, otherwise the replication may be
blocked. You can just
+ * filter out all the cells of the entry to stop it being replicated to peer
cluster,or just rely
+ * on the {@link WALCellFilter#filterCell(Entry,
org.apache.hadoop.hbase.Cell)} method to filter
+ * all the cells out.
+ * @param serial {@code true} if the peer is a serial replication peer,
otherwise {@code false}
+ */
+ default void setSerial(boolean serial) {
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilterBase.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilterBase.java
new file mode 100644
index 00000000000..81efae0e7a8
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilterBase.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Base class for {@link WALEntryFilter}, store the necessary common
properties like
+ * {@link #serial}.
+ * <p>
+ * Why need to treat serial replication specially:
+ * <p>
+ * Under some special cases, we may filter out some entries but we still need
to record the last
+ * pushed sequence id for these entries. For example, when we setup a
bidirection replication A
+ * <-> B, if we write to both cluster A and cluster B, cluster A will
not replicate the
+ * entries which are replicated from cluster B, which means we may have holes
in the replication
+ * sequence ids. So if the region is closed abnormally, i.e, we do not have a
close event for the
+ * region, and before the closing, we have some entries from cluster B, then
the replication from
+ * cluster A to cluster B will be stuck if we do not record the last pushed
sequence id of these
+ * entries because we will find out that the previous sequence id range will
never finish. So we
+ * need to record the sequence id for these entries so the last pushed
sequence id can reach the
+ * region barrier.
+ * @see <a
href="https://issues.apache.org/jira/browse/HBASE-29463">HBASE-29463</a>
+ */
[email protected](HBaseInterfaceAudience.REPLICATION)
+public abstract class WALEntryFilterBase implements WALEntryFilter {
+
+ protected boolean serial;
+
+ @Override
+ public void setSerial(boolean serial) {
+ this.serial = serial;
+ }
+
+ /**
+ * Call this method when you do not need to replicate the entry.
+ * <p>
+ * For serial replication, since still need to WALKey for recording
progress, we clear all the
+ * cells of the WALEdit. For normal replication, we just return null.
+ */
+ protected final Entry clearOrNull(Entry entry) {
+ if (serial) {
+ entry.getEdit().getCells().clear();
+ return entry;
+ } else {
+ return null;
+ }
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 99bb26da5fa..fbfb81b5ae1 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -335,6 +335,7 @@ public class ReplicationSource implements
ReplicationSourceInterface {
}
filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId,
replicationEndpoint));
this.walEntryFilter = new ChainWALEntryFilter(filters);
+ this.walEntryFilter.setSerial(replicationPeer.getPeerConfig().isSerial());
}
private void tryStartNewShipper(String walGroupId) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 1a8bbf74a2c..7cd6dde9eb8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -83,17 +83,14 @@ public class SerialReplicationSourceWALReader extends
ReplicationSourceWALReader
checker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
}
}
- // arrive here means we can push the entry, record the last sequence id
-
batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
- entry.getKey().getSequenceId());
// actually remove the entry.
- removeEntryFromStream(entryStream, batch);
+ removeEntryFromStream(entry, entryStream, batch);
if (addEntryToBatch(batch, entry)) {
break;
}
} else {
// actually remove the entry.
- removeEntryFromStream(entryStream, batch);
+ removeEntryFromStream(null, entryStream, batch);
}
boolean hasNext = entryStream.hasNext();
// always return if we have switched to a new file.
@@ -107,10 +104,15 @@ public class SerialReplicationSourceWALReader extends
ReplicationSourceWALReader
}
}
- private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch
batch)
+ private void removeEntryFromStream(Entry entry, WALEntryStream entryStream,
WALEntryBatch batch)
throws IOException {
entryStream.next();
- firstCellInEntryBeforeFiltering = null;
batch.setLastWalPosition(entryStream.getPosition());
+ // record last pushed sequence id if needed
+ if (entry != null) {
+ batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
+ entry.getKey().getSequenceId());
+ }
+ firstCellInEntryBeforeFiltering = null;
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestBidirectionSerialReplicationStuck.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestBidirectionSerialReplicationStuck.java
new file mode 100644
index 00000000000..f069d6b1095
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestBidirectionSerialReplicationStuck.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestBidirectionSerialReplicationStuck extends TestReplicationBase
{
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBidirectionSerialReplicationStuck.class);
+
+ @Override
+ protected boolean isSerialPeer() {
+ return true;
+ }
+
+ @Override
+ public void setUpBase() throws Exception {
+ UTIL1.ensureSomeRegionServersAvailable(2);
+ hbaseAdmin.balancerSwitch(false, true);
+ addPeer(PEER_ID2, tableName, UTIL1, UTIL2);
+ addPeer(PEER_ID2, tableName, UTIL2, UTIL1);
+ }
+
+ @Override
+ public void tearDownBase() throws Exception {
+ removePeer(PEER_ID2, UTIL1);
+ removePeer(PEER_ID2, UTIL2);
+ }
+
+ @Test
+ public void testStuck() throws Exception {
+ // disable the peer cluster1 -> cluster2
+ hbaseAdmin.disableReplicationPeer(PEER_ID2);
+ byte[] qualifier = Bytes.toBytes("q");
+ htable1.put(new Put(Bytes.toBytes("aaa-1")).addColumn(famName, qualifier,
Bytes.toBytes(1)));
+
+ // add a row to cluster2 and wait it replicate back to cluster1
+ htable2.put(new Put(Bytes.toBytes("aaa-2")).addColumn(famName, qualifier,
Bytes.toBytes(2)));
+ UTIL1.waitFor(30000, () -> htable1.exists(new
Get(Bytes.toBytes("aaa-2"))));
+
+ // kill the region server which holds the region which contains our rows
+ UTIL1.getRSForFirstRegionInTable(tableName).abort("for testing");
+ // wait until the region is online
+ UTIL1.waitFor(30000, () -> htable1.exists(new
Get(Bytes.toBytes("aaa-2"))));
+
+ // put a new row in cluster1
+ htable1.put(new Put(Bytes.toBytes("aaa-3")).addColumn(famName, qualifier,
Bytes.toBytes(3)));
+
+ // enable peer cluster1 -> cluster2, the new row should be replicated to
cluster2
+ hbaseAdmin.enableReplicationPeer(PEER_ID2);
+ UTIL1.waitFor(30000, () -> htable2.exists(new
Get(Bytes.toBytes("aaa-3"))));
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index fba25aee4a3..b021dcbc0d4 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -272,16 +272,27 @@ public class TestReplicationBase {
}
private boolean peerExist(String peerId) throws IOException {
- return hbaseAdmin.listReplicationPeers().stream().anyMatch(p ->
peerId.equals(p.getPeerId()));
+ return peerExist(peerId, UTIL1);
+ }
+
+ private boolean peerExist(String peerId, HBaseTestingUtility util) throws
IOException {
+ return util.getAdmin().listReplicationPeers().stream()
+ .anyMatch(p -> peerId.equals(p.getPeerId()));
}
protected final void addPeer(String peerId, TableName tableName) throws
Exception {
- if (!peerExist(peerId)) {
- ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
- .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer())
- .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName());
- hbaseAdmin.addReplicationPeer(peerId, builder.build());
+ addPeer(peerId, tableName, UTIL1, UTIL2);
+ }
+
+ protected final void addPeer(String peerId, TableName tableName,
HBaseTestingUtility source,
+ HBaseTestingUtility target) throws Exception {
+ if (peerExist(peerId, source)) {
+ return;
}
+ ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
+ .setClusterKey(target.getClusterKey()).setSerial(isSerialPeer())
+ .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName());
+ source.getAdmin().addReplicationPeer(peerId, builder.build());
}
@Before
@@ -290,8 +301,12 @@ public class TestReplicationBase {
}
protected final void removePeer(String peerId) throws Exception {
- if (peerExist(peerId)) {
- hbaseAdmin.removeReplicationPeer(peerId);
+ removePeer(peerId, UTIL1);
+ }
+
+ protected final void removePeer(String peerId, HBaseTestingUtility util)
throws Exception {
+ if (peerExist(peerId, util)) {
+ util.getAdmin().removeReplicationPeer(peerId);
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
index 1e26a940b2f..762945d745b 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -113,15 +113,20 @@ public class TestReplicationWALEntryFilters {
Entry userEntryEmpty = createEntry(null);
// no scopes
- // now we will not filter out entries without a replication scope since
serial replication still
- // need the sequence id, but the cells will all be filtered out.
+ assertNull(filter.filter(userEntry));
+ // now for serial replication, we will not filter out entries without a
replication scope since
+ // serial replication still need the sequence id, but the cells will all
be filtered out.
+ filter.setSerial(true);
assertTrue(filter.filter(userEntry).getEdit().isEmpty());
+ filter.setSerial(false);
// empty scopes
- // ditto
TreeMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
userEntry = createEntry(scopes, a, b);
+ assertNull(filter.filter(userEntry));
+ filter.setSerial(true);
assertTrue(filter.filter(userEntry).getEdit().isEmpty());
+ filter.setSerial(false);
// different scope
scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);