This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new e4f9c65e80d HBASE-29987 Replication position corruption when WAL file 
switch detected in ReplicationSourceWALReader run loop (#7909)
e4f9c65e80d is described below

commit e4f9c65e80dd7d5ffb103f782f054e0cc18f5878
Author: Siddharth Khillon <[email protected]>
AuthorDate: Thu Mar 12 20:38:47 2026 -0700

    HBASE-29987 Replication position corruption when WAL file switch detected 
in ReplicationSourceWALReader run loop (#7909)
    
    When ReplicationSourceWALReader.run() detects a WAL file switch via the
    switched() check, it enqueues an EOF batch but does not update
    currentPosition. If the outer loop restarts (e.g., due to
    WALEntryFilterRetryableException), the new WALEntryStream is created
    with the stale position from the old file, applied to the new file.
    This causes an infinite retry loop (EOFException: Cannot seek after EOF)
    and the corrupted position may be persisted to ZK, surviving restarts.
    
    The fix resets currentPosition to entryStream.getPosition() (which
    returns 0 after dequeueCurrentLog()) before enqueuing the EOF batch.
    
    Includes a regression test that reproduces the bug by using
    nb.capacity=1 to force EOF detection at line 153 (not inside
    readWALEntries), combined with a WALEntryFilterRetryableException on
    the first entry of the new file to trigger the outer loop restart.
    
    Co-authored-by: skhillon <[email protected]>
    Signed-off-by: Duo Zhang <[email protected]>
---
 .../regionserver/ReplicationSourceWALReader.java   |  1 +
 .../regionserver/TestBasicWALEntryStream.java      | 52 ++++++++++++++++++++++
 2 files changed, 53 insertions(+)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index e421042614c..9951bd70937 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -154,6 +154,7 @@ class ReplicationSourceWALReader extends Thread {
           // first, check if we have switched a file, if so, we need to 
manually add an EOF entry
           // batch to the queue
           if (currentPath != null && switched(entryStream, currentPath)) {
+            currentPosition = entryStream.getPosition();
             entryBatchQueue.put(WALEntryBatch.endOfFile(currentPath));
             continue;
           }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
index ac21c6619f5..6d73bc2c79a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
@@ -882,6 +882,58 @@ public abstract class TestBasicWALEntryStream extends 
WALEntryStreamTestBase {
     }
   }
 
+  /**
+   * Verify that when a WAL file switch is detected via the switched() check in
+   * ReplicationSourceWALReader.run(), currentPosition is reset so that a 
subsequent
+   * WALEntryFilterRetryableException does not cause the new file to be opened 
at the old file's
+   * position.
+   */
+  @Test
+  public void testPositionResetOnFileSwitchWithRetryableFilter() throws 
Exception {
+    appendEntriesToLogAndSync(3);
+    log.rollWriter();
+    AbstractFSWAL<?> abstractWAL = (AbstractFSWAL<?>) log;
+    Waiter.waitFor(CONF, 5000,
+      (Waiter.Predicate<Exception>) () -> 
abstractWAL.getInflightWALCloseCount() == 0);
+    appendEntriesToLogAndSync(3);
+
+    // Batch capacity of 1 ensures EOF on WAL A is detected by hasNext() in 
the run loop
+    // (not inside readWALEntries), which triggers the switched() path.
+    Configuration conf = new Configuration(CONF);
+    conf.setInt("replication.source.nb.capacity", 1);
+
+    AtomicInteger totalFilterCalls = new AtomicInteger(0);
+    AtomicBoolean threwOnce = new AtomicBoolean(false);
+    WALEntryFilter filter = entry -> {
+      int callNum = totalFilterCalls.incrementAndGet();
+      if (callNum > 3 && !threwOnce.get()) {
+        threwOnce.set(true);
+        throw new WALEntryFilterRetryableException("simulated filter failure 
after file switch");
+      }
+      return entry;
+    };
+
+    ReplicationSource source = mockReplicationSource(false, conf);
+    when(source.isPeerEnabled()).thenReturn(true);
+    ReplicationSourceWALReader reader =
+      new ReplicationSourceWALReader(fs, conf, logQueue, 0, filter, source, 
fakeWalGroupId);
+    reader.start();
+
+    int totalEntries = 0;
+    long deadline = System.currentTimeMillis() + 30000;
+    while (totalEntries < 6) {
+      long remaining = deadline - System.currentTimeMillis();
+      assertTrue("Reader appears stuck - likely position corruption. Only got 
" + totalEntries
+        + " of 6 entries", remaining > 0);
+      WALEntryBatch batch = reader.poll(1);
+      if (batch != null && batch != WALEntryBatch.NO_MORE_DATA) {
+        totalEntries += batch.getNbEntries();
+      }
+    }
+    assertEquals(6, totalEntries);
+    assertTrue("Filter should have thrown at least once", threwOnce.get());
+  }
+
   private static class PartialWALEntryFailingWALEntryFilter implements 
WALEntryFilter {
     private int filteredWALEntryCount = -1;
     private int walEntryCount = 0;

Reply via email to