This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new b894d4a19ac HBASE-29463 Bidirectional serial replication will block if 
a region’s last edit before rs crashed was from the peer cluster (#7172) (#7225)
b894d4a19ac is described below

commit b894d4a19ac7e6c70e983a831d326115dba7e828
Author: Duo Zhang <[email protected]>
AuthorDate: Thu Aug 14 15:28:30 2025 +0800

    HBASE-29463 Bidirectional serial replication will block if a region’s last 
edit before rs crashed was from the peer cluster (#7172) (#7225)
    
    (cherry picked from commit bea4272960e0c3a92eaf436ec6cc1c5a2527ffc3)
    
    Signed-off-by: Nick Dimiduk <[email protected]>
---
 .../hbase/replication/ChainWALEntryFilter.java     |  7 ++
 .../replication/ClusterMarkingEntryFilter.java     |  4 +-
 .../hbase/replication/ScopeWALEntryFilter.java     | 16 +++--
 .../hadoop/hbase/replication/WALEntryFilter.java   | 14 ++++
 .../hbase/replication/WALEntryFilterBase.java      | 66 ++++++++++++++++++
 .../regionserver/ReplicationSource.java            |  1 +
 .../SerialReplicationSourceWALReader.java          | 16 +++--
 .../TestBidirectionSerialReplicationStuck.java     | 79 ++++++++++++++++++++++
 .../hbase/replication/TestReplicationBase.java     | 31 ++++++---
 .../TestReplicationWALEntryFilters.java            | 11 ++-
 10 files changed, 218 insertions(+), 27 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
index aa84f4705b0..9683472f3ba 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
@@ -53,6 +53,13 @@ public class ChainWALEntryFilter implements WALEntryFilter {
     initCellFilters();
   }
 
+  @Override
+  public void setSerial(boolean serial) {
+    for (WALEntryFilter filter : filters) {
+      filter.setSerial(serial);
+    }
+  }
+
   public void initCellFilters() {
     ArrayList<WALCellFilter> cellFilters = new ArrayList<>(filters.length);
     for (WALEntryFilter filter : filters) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
index e05e79eab5a..041b9798857 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
@@ -31,7 +31,7 @@ import org.apache.yetus.audience.InterfaceStability;
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
 @InterfaceStability.Evolving
-public class ClusterMarkingEntryFilter implements WALEntryFilter {
+public class ClusterMarkingEntryFilter extends WALEntryFilterBase {
   private UUID clusterId;
   private UUID peerClusterId;
   private ReplicationEndpoint replicationEndpoint;
@@ -64,6 +64,6 @@ public class ClusterMarkingEntryFilter implements 
WALEntryFilter {
         return entry;
       }
     }
-    return null;
+    return clearOrNull(entry);
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
index 6dc41bcc014..1429379deb3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -26,24 +26,26 @@ import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
 
 /**
  * Keeps KVs that are scoped other than local
  */
 @InterfaceAudience.Private
-public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter {
+public class ScopeWALEntryFilter extends WALEntryFilterBase implements 
WALCellFilter {
 
   private final BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
 
   @Override
   public Entry filter(Entry entry) {
-    // Do not filter out an entire entry by replication scopes. As now we 
support serial
-    // replication, the sequence id of a marker is also needed by upper layer. 
We will filter out
-    // all the cells in the filterCell method below if the replication scopes 
is null or empty.
-    return entry;
+    NavigableMap<byte[], Integer> scopes = 
entry.getKey().getReplicationScopes();
+    if (MapUtils.isNotEmpty(scopes)) {
+      return entry;
+    }
+    return clearOrNull(entry);
   }
 
-  private boolean hasGlobalScope(NavigableMap<byte[], Integer> scopes, byte[] 
family) {
+  private static boolean hasGlobalScope(NavigableMap<byte[], Integer> scopes, 
byte[] family) {
     Integer scope = scopes.get(family);
     return scope != null && scope.intValue() == 
HConstants.REPLICATION_SCOPE_GLOBAL;
   }
@@ -51,7 +53,7 @@ public class ScopeWALEntryFilter implements WALEntryFilter, 
WALCellFilter {
   @Override
   public Cell filterCell(Entry entry, Cell cell) {
     NavigableMap<byte[], Integer> scopes = 
entry.getKey().getReplicationScopes();
-    if (scopes == null || scopes.isEmpty()) {
+    if (MapUtils.isEmpty(scopes)) {
       return null;
     }
     byte[] family = CellUtil.cloneFamily(cell);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
index 8aa60f74ebb..d77fedd94bc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
@@ -50,4 +50,18 @@ public interface WALEntryFilter {
    *         the entry to be skipped for replication.
    */
   Entry filter(Entry entry);
+
+  /**
+   * Tell the filter whether the peer is a serial replication peer.
+   * <p>
+   * For serial replication, usually you should not filter out an entire 
entry, unless the peer
+   * config does not contain the table, because we need the region name and 
sequence id of the entry
+   * to advance the pushed sequence id, otherwise the replication may be 
blocked. You can just
+   * filter out all the cells of the entry to stop it being replicated to peer 
cluster,or just rely
+   * on the {@link WALCellFilter#filterCell(Entry, 
org.apache.hadoop.hbase.Cell)} method to filter
+   * all the cells out.
+   * @param serial {@code true} if the peer is a serial replication peer, 
otherwise {@code false}
+   */
+  default void setSerial(boolean serial) {
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilterBase.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilterBase.java
new file mode 100644
index 00000000000..81efae0e7a8
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilterBase.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Base class for {@link WALEntryFilter}, store the necessary common 
properties like
+ * {@link #serial}.
+ * <p>
+ * Why need to treat serial replication specially:
+ * <p>
+ * Under some special cases, we may filter out some entries but we still need 
to record the last
+ * pushed sequence id for these entries. For example, when we setup a 
bidirection replication A
+ * &lt;-&gt; B, if we write to both cluster A and cluster B, cluster A will 
not replicate the
+ * entries which are replicated from cluster B, which means we may have holes 
in the replication
+ * sequence ids. So if the region is closed abnormally, i.e, we do not have a 
close event for the
+ * region, and before the closing, we have some entries from cluster B, then 
the replication from
+ * cluster A to cluster B will be stuck if we do not record the last pushed 
sequence id of these
+ * entries because we will find out that the previous sequence id range will 
never finish. So we
+ * need to record the sequence id for these entries so the last pushed 
sequence id can reach the
+ * region barrier.
+ * @see <a 
href="https://issues.apache.org/jira/browse/HBASE-29463";>HBASE-29463</a>
+ */
[email protected](HBaseInterfaceAudience.REPLICATION)
+public abstract class WALEntryFilterBase implements WALEntryFilter {
+
+  protected boolean serial;
+
+  @Override
+  public void setSerial(boolean serial) {
+    this.serial = serial;
+  }
+
+  /**
+   * Call this method when you do not need to replicate the entry.
+   * <p>
+   * For serial replication, since still need to WALKey for recording 
progress, we clear all the
+   * cells of the WALEdit. For normal replication, we just return null.
+   */
+  protected final Entry clearOrNull(Entry entry) {
+    if (serial) {
+      entry.getEdit().getCells().clear();
+      return entry;
+    } else {
+      return null;
+    }
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index aebdccdc92d..2ce15512554 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -332,6 +332,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     }
     filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, 
replicationEndpoint));
     this.walEntryFilter = new ChainWALEntryFilter(filters);
+    this.walEntryFilter.setSerial(replicationPeer.getPeerConfig().isSerial());
   }
 
   private void tryStartNewShipper(String walGroupId) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 41d95df2821..d1a2e8b5734 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -97,17 +97,14 @@ public class SerialReplicationSourceWALReader extends 
ReplicationSourceWALReader
           }
           sleepMultiplier = sleep(sleepMultiplier);
         }
-        // arrive here means we can push the entry, record the last sequence id
-        
batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
-          entry.getKey().getSequenceId());
         // actually remove the entry.
-        removeEntryFromStream(entryStream, batch);
+        removeEntryFromStream(entry, entryStream, batch);
         if (addEntryToBatch(batch, entry)) {
           break;
         }
       } else {
         // actually remove the entry.
-        removeEntryFromStream(entryStream, batch);
+        removeEntryFromStream(null, entryStream, batch);
       }
       WALEntryStream.HasNext hasNext = entryStream.hasNext();
       // always return if we have switched to a new file.
@@ -125,9 +122,14 @@ public class SerialReplicationSourceWALReader extends 
ReplicationSourceWALReader
     }
   }
 
-  private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch 
batch) {
+  private void removeEntryFromStream(Entry entry, WALEntryStream entryStream, 
WALEntryBatch batch) {
     entryStream.next();
-    firstCellInEntryBeforeFiltering = null;
     batch.setLastWalPosition(entryStream.getPosition());
+    // record last pushed sequence id if needed
+    if (entry != null) {
+      batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
+        entry.getKey().getSequenceId());
+    }
+    firstCellInEntryBeforeFiltering = null;
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestBidirectionSerialReplicationStuck.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestBidirectionSerialReplicationStuck.java
new file mode 100644
index 00000000000..f069d6b1095
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestBidirectionSerialReplicationStuck.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestBidirectionSerialReplicationStuck extends TestReplicationBase 
{
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBidirectionSerialReplicationStuck.class);
+
+  @Override
+  protected boolean isSerialPeer() {
+    return true;
+  }
+
+  @Override
+  public void setUpBase() throws Exception {
+    UTIL1.ensureSomeRegionServersAvailable(2);
+    hbaseAdmin.balancerSwitch(false, true);
+    addPeer(PEER_ID2, tableName, UTIL1, UTIL2);
+    addPeer(PEER_ID2, tableName, UTIL2, UTIL1);
+  }
+
+  @Override
+  public void tearDownBase() throws Exception {
+    removePeer(PEER_ID2, UTIL1);
+    removePeer(PEER_ID2, UTIL2);
+  }
+
+  @Test
+  public void testStuck() throws Exception {
+    // disable the peer cluster1 -> cluster2
+    hbaseAdmin.disableReplicationPeer(PEER_ID2);
+    byte[] qualifier = Bytes.toBytes("q");
+    htable1.put(new Put(Bytes.toBytes("aaa-1")).addColumn(famName, qualifier, 
Bytes.toBytes(1)));
+
+    // add a row to cluster2 and wait it replicate back to cluster1
+    htable2.put(new Put(Bytes.toBytes("aaa-2")).addColumn(famName, qualifier, 
Bytes.toBytes(2)));
+    UTIL1.waitFor(30000, () -> htable1.exists(new 
Get(Bytes.toBytes("aaa-2"))));
+
+    // kill the region server which holds the region which contains our rows
+    UTIL1.getRSForFirstRegionInTable(tableName).abort("for testing");
+    // wait until the region is online
+    UTIL1.waitFor(30000, () -> htable1.exists(new 
Get(Bytes.toBytes("aaa-2"))));
+
+    // put a new row in cluster1
+    htable1.put(new Put(Bytes.toBytes("aaa-3")).addColumn(famName, qualifier, 
Bytes.toBytes(3)));
+
+    // enable peer cluster1 -> cluster2, the new row should be replicated to 
cluster2
+    hbaseAdmin.enableReplicationPeer(PEER_ID2);
+    UTIL1.waitFor(30000, () -> htable2.exists(new 
Get(Bytes.toBytes("aaa-3"))));
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index fba25aee4a3..b021dcbc0d4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -272,16 +272,27 @@ public class TestReplicationBase {
   }
 
   private boolean peerExist(String peerId) throws IOException {
-    return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> 
peerId.equals(p.getPeerId()));
+    return peerExist(peerId, UTIL1);
+  }
+
+  private boolean peerExist(String peerId, HBaseTestingUtility util) throws 
IOException {
+    return util.getAdmin().listReplicationPeers().stream()
+      .anyMatch(p -> peerId.equals(p.getPeerId()));
   }
 
   protected final void addPeer(String peerId, TableName tableName) throws 
Exception {
-    if (!peerExist(peerId)) {
-      ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
-        .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer())
-        .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName());
-      hbaseAdmin.addReplicationPeer(peerId, builder.build());
+    addPeer(peerId, tableName, UTIL1, UTIL2);
+  }
+
+  protected final void addPeer(String peerId, TableName tableName, 
HBaseTestingUtility source,
+    HBaseTestingUtility target) throws Exception {
+    if (peerExist(peerId, source)) {
+      return;
     }
+    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
+      .setClusterKey(target.getClusterKey()).setSerial(isSerialPeer())
+      .setReplicationEndpointImpl(ReplicationEndpointTest.class.getName());
+    source.getAdmin().addReplicationPeer(peerId, builder.build());
   }
 
   @Before
@@ -290,8 +301,12 @@ public class TestReplicationBase {
   }
 
   protected final void removePeer(String peerId) throws Exception {
-    if (peerExist(peerId)) {
-      hbaseAdmin.removeReplicationPeer(peerId);
+    removePeer(peerId, UTIL1);
+  }
+
+  protected final void removePeer(String peerId, HBaseTestingUtility util) 
throws Exception {
+    if (peerExist(peerId, util)) {
+      util.getAdmin().removeReplicationPeer(peerId);
     }
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
index 1e26a940b2f..762945d745b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -113,15 +113,20 @@ public class TestReplicationWALEntryFilters {
     Entry userEntryEmpty = createEntry(null);
 
     // no scopes
-    // now we will not filter out entries without a replication scope since 
serial replication still
-    // need the sequence id, but the cells will all be filtered out.
+    assertNull(filter.filter(userEntry));
+    // now for serial replication, we will not filter out entries without a 
replication scope since
+    // serial replication still need the sequence id, but the cells will all 
be filtered out.
+    filter.setSerial(true);
     assertTrue(filter.filter(userEntry).getEdit().isEmpty());
+    filter.setSerial(false);
 
     // empty scopes
-    // ditto
     TreeMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
     userEntry = createEntry(scopes, a, b);
+    assertNull(filter.filter(userEntry));
+    filter.setSerial(true);
     assertTrue(filter.filter(userEntry).getEdit().isEmpty());
+    filter.setSerial(false);
 
     // different scope
     scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);

Reply via email to