This is an automated email from the ASF dual-hosted git repository.
taklwu pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-28957 by this push:
new a7a6d3c2726 HBASE-29219 Ignore Empty WAL Files While Consuming
Backed-Up WAL Files (#7106)
a7a6d3c2726 is described below
commit a7a6d3c2726c15fb9324b75471a9e166c0ec5848
Author: vinayak hegde <[email protected]>
AuthorDate: Wed Jun 25 01:28:51 2025 +0530
HBASE-29219 Ignore Empty WAL Files While Consuming Backed-Up WAL Files
(#7106)
Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
Reviewed by: Kota-SH <[email protected]>
Reviewed by: Kevin Geiszler <[email protected]>
---
.../hadoop/hbase/backup/impl/BackupAdminImpl.java | 2 +
.../hadoop/hbase/mapreduce/WALInputFormat.java | 11 ++++-
.../apache/hadoop/hbase/mapreduce/WALPlayer.java | 18 +++++++++
.../hadoop/hbase/mapreduce/TestWALInputFormat.java | 47 ++++++++++++++++++++++
.../hadoop/hbase/mapreduce/TestWALPlayer.java | 46 +++++++++++++++++++++
5 files changed, 122 insertions(+), 2 deletions(-)
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index 1e91258ba6c..e82d9804f9d 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -23,6 +23,7 @@ import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONT
import static
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;
import java.io.IOException;
import java.text.ParseException;
@@ -846,6 +847,7 @@ public class BackupAdminImpl implements BackupAdmin {
Configuration conf = HBaseConfiguration.create(conn.getConfiguration());
conf.setLong(WALInputFormat.START_TIME_KEY, startTime);
conf.setLong(WALInputFormat.END_TIME_KEY, endTime);
+ conf.setBoolean(IGNORE_EMPTY_FILES, true);
Tool walPlayer = new WALPlayer();
walPlayer.setConf(conf);
return walPlayer;
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
index 8d6e91633f7..a49cdca5a84 100644
---
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
@@ -328,14 +328,21 @@ public class WALInputFormat extends InputFormat<WALKey,
WALEdit> {
throw e;
}
}
+
+ boolean ignoreEmptyFiles =
+ conf.getBoolean(WALPlayer.IGNORE_EMPTY_FILES,
WALPlayer.DEFAULT_IGNORE_EMPTY_FILES);
List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
for (FileStatus file : allFiles) {
+ if (ignoreEmptyFiles && file.getLen() == 0) {
+ LOG.warn("Ignoring empty file: " + file.getPath());
+ continue;
+ }
splits.add(new WALSplit(file.getPath().toString(), file.getLen(),
startTime, endTime));
}
return splits;
}
- private Path[] getInputPaths(Configuration conf) {
+ Path[] getInputPaths(Configuration conf) {
String inpDirs = conf.get(FileInputFormat.INPUT_DIR);
return StringUtils
.stringToPath(inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY,
",")));
@@ -349,7 +356,7 @@ public class WALInputFormat extends InputFormat<WALKey,
WALEdit> {
* equal to this value else we will filter out the file. If
name does not seem to
* have a timestamp, we will just return it w/o filtering.
*/
- private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime,
long endTime)
+ List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long
endTime)
throws IOException {
List<FileStatus> result = new ArrayList<>();
LOG.debug("Scanning " + dir.toString() + " for WAL files");
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 5e2dc0902e0..cea6da97649 100644
---
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -81,6 +81,24 @@ public class WALPlayer extends Configured implements Tool {
public final static String IGNORE_MISSING_FILES =
"wal.input.ignore.missing.files";
public final static String MULTI_TABLES_SUPPORT = "wal.multi.tables.support";
+ /**
+ * Configuration flag that controls how the WALPlayer handles empty input
WAL files.
+ * <p>
+ * If set to {@code true}, the WALPlayer will silently ignore empty files
that cannot be parsed as
+ * valid WAL files. This is useful in scenarios where such files are
expected (e.g., due to
+ * partial writes or cleanup operations).
+ * </p>
+ * <p>
+ * If set to {@code false} (default), the WALPlayer will throw an exception
when it encounters an
+ * empty or un-parsable WAL file. This is useful for catching unexpected
data issues early.
+ * </p>
+ * <p>
+ * Default value: {@link #DEFAULT_IGNORE_EMPTY_FILES} ({@code false})
+ * </p>
+ */
+ public final static String IGNORE_EMPTY_FILES =
"wal.input.ignore.empty.files";
+ public final static boolean DEFAULT_IGNORE_EMPTY_FILES = false;
+
protected static final String tableSeparator = ";";
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java
index 70602a37166..b31ea014977 100644
---
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java
@@ -18,16 +18,23 @@
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -74,4 +81,44 @@ public class TestWALInputFormat {
WALInputFormat.addFile(lfss, lfs, now, now);
assertEquals(8, lfss.size());
}
+
+ @Test
+ public void testEmptyFileIsIgnoredWhenConfigured() throws IOException,
InterruptedException {
+ List<InputSplit> splits = getSplitsForEmptyFile(true);
+ assertTrue("Empty file should be ignored when IGNORE_EMPTY_FILES is true",
splits.isEmpty());
+ }
+
+ @Test
+ public void testEmptyFileIsIncludedWhenNotIgnored() throws IOException,
InterruptedException {
+ List<InputSplit> splits = getSplitsForEmptyFile(false);
+ assertEquals("Empty file should be included when IGNORE_EMPTY_FILES is
false", 1,
+ splits.size());
+ }
+
+ private List<InputSplit> getSplitsForEmptyFile(boolean ignoreEmptyFiles)
+ throws IOException, InterruptedException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, ignoreEmptyFiles);
+
+ JobContext jobContext = Mockito.mock(JobContext.class);
+ Mockito.when(jobContext.getConfiguration()).thenReturn(conf);
+
+ LocatedFileStatus emptyFile = Mockito.mock(LocatedFileStatus.class);
+ Mockito.when(emptyFile.getLen()).thenReturn(0L);
+ Mockito.when(emptyFile.getPath()).thenReturn(new Path("/empty.wal"));
+
+ WALInputFormat inputFormat = new WALInputFormat() {
+ @Override
+ Path[] getInputPaths(Configuration conf) {
+ return new Path[] { new Path("/input") };
+ }
+
+ @Override
+ List<FileStatus> getFiles(FileSystem fs, Path inputPath, long startTime,
long endTime) {
+ return Collections.singletonList(emptyFile);
+ }
+ };
+
+ return inputFormat.getSplits(jobContext, "", "");
+ }
}
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
index b39d04802c9..7818f8d2f73 100644
---
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
@@ -22,6 +22,7 @@ import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -31,10 +32,12 @@ import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@@ -338,4 +341,47 @@ public class TestWALPlayer {
}
+ @Test
+ public void testIgnoreEmptyWALFiles() throws Exception {
+ Path inputDir = createEmptyWALFile("empty-wal-dir");
+ FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
+ Path emptyWAL = new Path(inputDir, "empty.wal");
+
+ assertTrue("Empty WAL file should exist", dfs.exists(emptyWAL));
+ assertEquals("WAL file should be 0 bytes", 0,
dfs.getFileStatus(emptyWAL).getLen());
+
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, true);
+
+ int exitCode = ToolRunner.run(conf, new WALPlayer(conf), new String[] {
inputDir.toString() });
+ assertEquals("WALPlayer should exit cleanly even with empty files", 0,
exitCode);
+ }
+
+ @Test
+ public void testFailOnEmptyWALFilesWhenNotIgnored() throws Exception {
+ Path inputDir = createEmptyWALFile("fail-empty-wal-dir");
+ FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
+ Path emptyWAL = new Path(inputDir, "empty.wal");
+
+ assertTrue("Empty WAL file should exist", dfs.exists(emptyWAL));
+ assertEquals("WAL file should be 0 bytes", 0,
dfs.getFileStatus(emptyWAL).getLen());
+
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ conf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, false);
+
+ int exitCode = ToolRunner.run(conf, new WALPlayer(conf), new String[] {
inputDir.toString() });
+ assertNotEquals("WALPlayer should fail on empty files when not ignored",
0, exitCode);
+ }
+
+ private Path createEmptyWALFile(String walDir) throws IOException {
+ FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
+ Path inputDir = new Path("/" + walDir);
+ dfs.mkdirs(inputDir);
+
+ Path emptyWAL = new Path(inputDir, "empty.wal");
+ FSDataOutputStream out = dfs.create(emptyWAL);
+ out.close(); // Explicitly closing the stream
+
+ return inputDir;
+ }
}