This is an automated email from the ASF dual-hosted git repository.
taklwu pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-28957 by this push:
new cb9edf9fd2a HBASE-29441 ReplicationSourceShipper should delegate the
empty wal entries handling to ReplicationEndpoint (#7145)
cb9edf9fd2a is described below
commit cb9edf9fd2a77d1a45bda89b217851f1a4c8c802
Author: vinayak hegde <[email protected]>
AuthorDate: Wed Jul 16 23:11:35 2025 +0530
HBASE-29441 ReplicationSourceShipper should delegate the empty wal entries
handling to ReplicationEndpoint (#7145)
Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
---
.../ContinuousBackupReplicationEndpoint.java | 9 ++++
.../hbase/replication/EmptyEntriesPolicy.java | 34 ++++++++++++
.../hbase/replication/ReplicationEndpoint.java | 18 +++++++
.../regionserver/ReplicationSourceShipper.java | 33 ++++++++++--
.../regionserver/TestReplicationSource.java | 63 ++++++++++++++++++++++
5 files changed, 153 insertions(+), 4 deletions(-)
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
index bf3fbd531bf..2442e0789a8 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy;
import org.apache.hadoop.hbase.replication.ReplicationResult;
import
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -205,6 +206,14 @@ public class ContinuousBackupReplicationEndpoint extends
BaseReplicationEndpoint
notifyStarted();
}
+ @Override
+ public EmptyEntriesPolicy getEmptyEntriesPolicy() {
+ // Since this endpoint writes to S3 asynchronously, an empty entry batch
+ // does not guarantee that all previously submitted entries were persisted.
+ // Hence, avoid committing the WAL position.
+ return EmptyEntriesPolicy.SUBMIT;
+ }
+
@Override
public ReplicationResult replicate(ReplicateContext replicateContext) {
final List<WAL.Entry> entries = replicateContext.getEntries();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/EmptyEntriesPolicy.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/EmptyEntriesPolicy.java
new file mode 100644
index 00000000000..5a5d8ab754c
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/EmptyEntriesPolicy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Policy that defines what a replication endpoint should do when the entry
batch is empty. This is
+ * used to determine whether the replication source should consider an empty
batch as: -
+ * {@code COMMIT}: Consider the position as fully committed, and update the
WAL position. -
+ * {@code SUBMIT}: Treat it as submitted but not committed, i.e., do not
advance the WAL position.
+ * Some endpoints may buffer entries (e.g., in open files on S3) and delay
actual persistence. In
+ * such cases, an empty batch should not result in WAL position commit.
+ */
[email protected]
+public enum EmptyEntriesPolicy {
+ COMMIT,
+ SUBMIT
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index fc5c2bf6265..fbb6b6b9ef1 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -291,4 +291,22 @@ public interface ReplicationEndpoint extends
ReplicationPeerConfigListener {
* @throws IllegalStateException if this service's state isn't FAILED.
*/
Throwable failureCause();
+
+ /**
+ * Defines the behavior when the replication source encounters an empty
entry batch.
+ * <p>
+ * By default, this method returns {@link EmptyEntriesPolicy#COMMIT},
meaning the replication
+ * source can safely consider the WAL position as committed and move on.
+ * </p>
+ * <p>
+ * However, certain endpoints like backup or asynchronous S3 writers may
delay persistence (e.g.,
+ * writing to temporary files or buffers). In those cases, returning
+ * {@link EmptyEntriesPolicy#SUBMIT} avoids incorrectly advancing WAL
position and risking data
+ * loss.
+ * </p>
+ * @return the {@link EmptyEntriesPolicy} to apply for empty entry batches.
+ */
+ default EmptyEntriesPolicy getEmptyEntriesPolicy() {
+ return EmptyEntriesPolicy.COMMIT;
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index ee819faa77b..f45c8762683 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static
org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
import static
org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;
+import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -27,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -150,13 +152,25 @@ public class ReplicationSourceShipper extends Thread {
}
/**
- * Do the shipping logic
+ * Do the shipping logic.
*/
- private void shipEdits(WALEntryBatch entryBatch) {
+ @RestrictedApi(
+ explanation = "Package-private for test visibility only. Do not use
outside tests.",
+ link = "",
+ allowedOnPath =
"(.*/src/test/.*|.*/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java)")
+ void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
- updateLogPosition(entryBatch, ReplicationResult.COMMITTED);
+ /*
+ * Delegate to the endpoint to decide how to treat empty entry batches.
In most replication
+ * flows, receiving an empty entry batch means that everything so far
has been successfully
+ * replicated and committed — so it's safe to mark the WAL position as
committed (COMMIT).
+ * However, some endpoints (e.g., asynchronous S3 backups) may buffer
writes and delay actual
+ * persistence. In such cases, we must avoid committing the WAL position
prematurely.
+ */
+ final ReplicationResult result = getReplicationResult();
+ updateLogPosition(entryBatch, result);
return;
}
int currentSize = (int) entryBatch.getHeapSize();
@@ -232,6 +246,13 @@ public class ReplicationSourceShipper extends Thread {
}
}
+ private ReplicationResult getReplicationResult() {
+ EmptyEntriesPolicy policy =
source.getReplicationEndpoint().getEmptyEntriesPolicy();
+ return (policy == EmptyEntriesPolicy.COMMIT)
+ ? ReplicationResult.COMMITTED
+ : ReplicationResult.SUBMITTED;
+ }
+
private void cleanUpHFileRefs(WALEdit edit) throws IOException {
String peerId = source.getPeerId();
if (peerId.contains("-")) {
@@ -256,7 +277,11 @@ public class ReplicationSourceShipper extends Thread {
}
}
- private boolean updateLogPosition(WALEntryBatch batch, ReplicationResult
replicated) {
+ @RestrictedApi(
+ explanation = "Package-private for test visibility only. Do not use
outside tests.",
+ link = "",
+ allowedOnPath =
"(.*/src/test/.*|.*/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java)")
+ boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replicated)
{
boolean updated = false;
// if end of file is true, then the logPositionAndCleanOldLogs method will
remove the file
// record on zk, so let's call it. The last wal position maybe zero if end
of file is true and
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 37af52eb93b..25eef51ff68 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -53,11 +53,13 @@ import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
+import org.apache.hadoop.hbase.replication.ReplicationResult;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -492,6 +494,67 @@ public class TestReplicationSource {
}
+ /**
+ * Custom ReplicationEndpoint that simulates an asynchronous target like S3
or cloud storage. In
+ * this case, empty entry batches should not cause WAL position to be
committed immediately.
+ */
+ public static class AsyncReplicationEndpoint extends
DoNothingReplicationEndpoint {
+ @Override
+ public EmptyEntriesPolicy getEmptyEntriesPolicy() {
+ return EmptyEntriesPolicy.SUBMIT;
+ }
+ }
+
+ /**
+ * Default synchronous ReplicationEndpoint that treats empty entry batches
as a signal to commit
+ * WAL position, assuming all entries pushed before were safely replicated.
+ */
+ public static class SyncReplicationEndpoint extends
DoNothingReplicationEndpoint {
+ // Inherits default COMMIT behavior
+ }
+
+ /**
+ * Verifies that ReplicationSourceShipper commits the WAL position when
using a synchronous
+ * endpoint and the entry batch is empty.
+ */
+ @Test
+ public void testEmptyBatchCommitsPositionForCommitEndpoint() {
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ ReplicationSource source = Mockito.mock(ReplicationSource.class);
+ Mockito.when(source.getReplicationEndpoint()).thenReturn(new
SyncReplicationEndpoint());
+
+ ReplicationSourceShipper shipper =
+ Mockito.spy(new ReplicationSourceShipper(conf, "testGroup", source,
null));
+
+ WALEntryBatch emptyBatch = new WALEntryBatch(0, new Path("test-wal"));
+
+ shipper.shipEdits(emptyBatch);
+
+ // With default (COMMIT) policy, empty entry batch should advance WAL
position
+ Mockito.verify(shipper).updateLogPosition(emptyBatch,
ReplicationResult.COMMITTED);
+ }
+
+ /**
+ * Verifies that ReplicationSourceShipper does NOT commit the WAL position
when using an
+ * asynchronous endpoint and the entry batch is empty.
+ */
+ @Test
+ public void testEmptyBatchSubmitsPositionForSubmitEndpoint() {
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ ReplicationSource source = Mockito.mock(ReplicationSource.class);
+ Mockito.when(source.getReplicationEndpoint()).thenReturn(new
AsyncReplicationEndpoint());
+
+ ReplicationSourceShipper shipper =
+ Mockito.spy(new ReplicationSourceShipper(conf, "testGroup", source,
null));
+
+ WALEntryBatch emptyBatch = new WALEntryBatch(0, new Path("test-wal"));
+
+ shipper.shipEdits(emptyBatch);
+
+ // With SUBMIT policy, empty entry batch should NOT advance WAL position
+ Mockito.verify(shipper).updateLogPosition(emptyBatch,
ReplicationResult.SUBMITTED);
+ }
+
private RegionServerServices setupForAbortTests(ReplicationSource rs,
Configuration conf,
String endpointName) throws IOException {
conf.setInt("replication.source.maxretriesmultiplier", 1);