This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new f3c0ce02312 HBASE-28951 Handle simultaneous WAL splitting to recovered
edits by multiple worker (#7228) (#7075)
f3c0ce02312 is described below
commit f3c0ce02312d7161160d6f8c9cd7aadeb710d3c0
Author: Umesh <[email protected]>
AuthorDate: Wed Aug 20 07:40:47 2025 +0530
HBASE-28951 Handle simultaneous WAL splitting to recovered edits by
multiple worker (#7228) (#7075)
Signed-off-by: Andrew Purtell <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Aman Poonia <[email protected]>
---
.../wal/AbstractRecoveredEditsOutputSink.java | 100 ++++++++++++++++-----
.../org/apache/hadoop/hbase/wal/WALSplitUtil.java | 19 ++--
.../org/apache/hadoop/hbase/wal/TestWALSplit.java | 69 +++++++++++++-
3 files changed, 156 insertions(+), 32 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
index 0411b1b76bd..84021569272 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
@@ -22,6 +22,9 @@ import static
org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath;
import java.io.EOFException;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -30,8 +33,10 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
@@ -45,6 +50,7 @@ abstract class AbstractRecoveredEditsOutputSink extends
OutputSink {
private static final Logger LOG =
LoggerFactory.getLogger(RecoveredEditsOutputSink.class);
private final WALSplitter walSplitter;
private final ConcurrentMap<String, Long> regionMaximumEditLogSeqNum = new
ConcurrentHashMap<>();
+ private static final int MAX_RENAME_RETRY_COUNT = 5;
public AbstractRecoveredEditsOutputSink(WALSplitter walSplitter,
WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int
numWriters) {
@@ -55,9 +61,12 @@ abstract class AbstractRecoveredEditsOutputSink extends
OutputSink {
/** Returns a writer that wraps a {@link WALProvider.Writer} and its Path.
Caller should close. */
protected RecoveredEditsWriter createRecoveredEditsWriter(TableName
tableName, byte[] region,
long seqId) throws IOException {
+ // If multiple worker are splitting a WAL at a same time, both should use
unique file name to
+ // avoid conflict
Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId,
walSplitter.getFileBeingSplit().getPath().getName(),
walSplitter.getTmpDirName(),
- walSplitter.conf);
+ walSplitter.conf, getWorkerNameComponent());
+
if (walSplitter.walFS.exists(regionEditsPath)) {
LOG.warn("Found old edits file. It could be the "
+ "result of a previous failed split attempt. Deleting " +
regionEditsPath + ", length="
@@ -73,6 +82,20 @@ abstract class AbstractRecoveredEditsOutputSink extends
OutputSink {
return new RecoveredEditsWriter(region, regionEditsPath, w, seqId);
}
+ private String getWorkerNameComponent() {
+ if (walSplitter.rsServices == null) {
+ return "";
+ }
+ try {
+ return URLEncoder.encode(
+ walSplitter.rsServices.getServerName().toShortString()
+ .replace(Addressing.HOSTNAME_PORT_SEPARATOR,
ServerName.SERVERNAME_SEPARATOR),
+ StandardCharsets.UTF_8.name());
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("URLEncoder doesn't support UTF-8", e);
+ }
+ }
+
/**
* abortRecoveredEditsWriter closes the editsWriter, but does not rename and
finalize the
* recovered edits WAL files. Please see HBASE-28569.
@@ -103,22 +126,40 @@ abstract class AbstractRecoveredEditsOutputSink extends
OutputSink {
Path dst = getCompletedRecoveredEditsFilePath(editsWriter.path,
regionMaximumEditLogSeqNum.get(Bytes.toString(editsWriter.encodedRegionName)));
try {
- if (!dst.equals(editsWriter.path) && walSplitter.walFS.exists(dst)) {
- deleteOneWithFewerEntries(editsWriter, dst);
- }
// Skip the unit tests which create a splitter that reads and
// writes the data without touching disk.
// TestHLogSplit#testThreading is an example.
if (walSplitter.walFS.exists(editsWriter.path)) {
- if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
- final String errorMsg =
- "Failed renaming recovered edits " + editsWriter.path + " to " +
dst;
+ boolean retry;
+ int retryCount = 0;
+ do {
+ retry = false;
+ retryCount++;
+ // If rename is successful, it means recovered edits are
successfully places at right
+ // place but if rename fails, there can be below reasons
+ // 1. dst already exist - in this case if dst have desired edits we
will keep the dst,
+ // delete the editsWriter.path and consider this success else if dst
have fewer edits, we
+ // will delete the dst and retry the rename
+ // 2. parent directory does not exit - in one edge case this is
possible when this worker
+ // got stuck before rename and HMaster get another worker to split
the wal, SCP will
+ // proceed and once region get opened on one RS, we delete the
recovered.edits directory,
+ // in this case there is no harm in failing this procedure after
retry exhausted.
+ if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
+ retry = deleteOneWithFewerEntriesToRetry(editsWriter, dst);
+ }
+ } while (retry && retryCount < MAX_RENAME_RETRY_COUNT);
+
+ // If we are out of loop with retry flag `true` it means we have
exhausted the retries.
+ if (retry) {
+ final String errorMsg = "Failed renaming recovered edits " +
editsWriter.path + " to "
+ + dst + " in " + MAX_RENAME_RETRY_COUNT + " retries";
updateStatusWithMsg(errorMsg);
throw new IOException(errorMsg);
+ } else {
+ final String renameEditMsg = "Rename recovered edits " +
editsWriter.path + " to " + dst;
+ LOG.info(renameEditMsg);
+ updateStatusWithMsg(renameEditMsg);
}
- final String renameEditMsg = "Rename recovered edits " +
editsWriter.path + " to " + dst;
- LOG.info(renameEditMsg);
- updateStatusWithMsg(renameEditMsg);
}
} catch (IOException ioe) {
final String errorMsg = "Could not rename recovered edits " +
editsWriter.path + " to " + dst;
@@ -186,35 +227,48 @@ abstract class AbstractRecoveredEditsOutputSink extends
OutputSink {
}
// delete the one with fewer wal entries
- private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter,
Path dst)
+ private boolean deleteOneWithFewerEntriesToRetry(RecoveredEditsWriter
editsWriter, Path dst)
throws IOException {
- long dstMinLogSeqNum = -1L;
- try (WAL.Reader reader =
walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) {
- WAL.Entry entry = reader.next();
- if (entry != null) {
- dstMinLogSeqNum = entry.getKey().getSequenceId();
- }
- } catch (EOFException e) {
- LOG.debug("Got EOF when reading first WAL entry from {}, an empty or
broken WAL file?", dst,
- e);
+ if (!walSplitter.walFS.exists(dst)) {
+ LOG.info("dst {} doesn't exist, need to retry ", dst);
+ return true;
}
- if (editsWriter.minLogSeqNum < dstMinLogSeqNum) {
+
+ if (isDstHasFewerEntries(editsWriter, dst)) {
LOG.warn("Found existing old edits file. It could be the result of a
previous failed"
+ " split attempt or we have duplicated wal entries. Deleting " + dst
+ ", length="
- + walSplitter.walFS.getFileStatus(dst).getLen());
+ + walSplitter.walFS.getFileStatus(dst).getLen() + " and retry is
needed");
if (!walSplitter.walFS.delete(dst, false)) {
LOG.warn("Failed deleting of old {}", dst);
throw new IOException("Failed deleting of old " + dst);
}
+ return true;
} else {
LOG
.warn("Found existing old edits file and we have less entries.
Deleting " + editsWriter.path
- + ", length=" +
walSplitter.walFS.getFileStatus(editsWriter.path).getLen());
+ + ", length=" +
walSplitter.walFS.getFileStatus(editsWriter.path).getLen()
+ + " and no retry needed as dst has all edits");
if (!walSplitter.walFS.delete(editsWriter.path, false)) {
LOG.warn("Failed deleting of {}", editsWriter.path);
throw new IOException("Failed deleting of " + editsWriter.path);
}
+ return false;
+ }
+ }
+
+ private boolean isDstHasFewerEntries(RecoveredEditsWriter editsWriter, Path
dst)
+ throws IOException {
+ long dstMinLogSeqNum = -1L;
+ try (WAL.Reader reader =
walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) {
+ WAL.Entry entry = reader.next();
+ if (entry != null) {
+ dstMinLogSeqNum = entry.getKey().getSequenceId();
+ }
+ } catch (EOFException e) {
+ LOG.debug("Got EOF when reading first WAL entry from {}, an empty or
broken WAL file?", dst,
+ e);
}
+ return editsWriter.minLogSeqNum < dstMinLogSeqNum;
}
/**
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
index 218777a55f6..51cf6742b80 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
@@ -150,17 +150,19 @@ public final class WALSplitUtil {
* /hbase/some_table/2323432434/recovered.edits/2332. This method also
ensures existence of
* RECOVERED_EDITS_DIR under the region creating it if necessary. And also
set storage policy for
* RECOVERED_EDITS_DIR if WAL_STORAGE_POLICY is configured.
- * @param tableName the table name
- * @param encodedRegionName the encoded region name
- * @param seqId the sequence id which used to generate file name
- * @param fileNameBeingSplit the file being split currently. Used to
generate tmp file name.
- * @param tmpDirName of the directory used to sideline old recovered
edits file
- * @param conf configuration
+ * @param tableName the table name
+ * @param encodedRegionName the encoded region name
+ * @param seqId the sequence id which used to generate file
name
+ * @param fileNameBeingSplit the file being split currently. Used to
generate tmp file name.
+ * @param tmpDirName of the directory used to sideline old
recovered edits file
+ * @param conf configuration
+ * @param workerNameComponent the worker name component for the file name
* @return Path to file into which to dump split log edits.
*/
@SuppressWarnings("deprecation")
static Path getRegionSplitEditsPath(TableName tableName, byte[]
encodedRegionName, long seqId,
- String fileNameBeingSplit, String tmpDirName, Configuration conf) throws
IOException {
+ String fileNameBeingSplit, String tmpDirName, Configuration conf, String
workerNameComponent)
+ throws IOException {
FileSystem walFS = CommonFSUtils.getWALFileSystem(conf);
Path tableDir = CommonFSUtils.getWALTableDir(conf, tableName);
String encodedRegionNameStr = Bytes.toString(encodedRegionName);
@@ -192,7 +194,8 @@ public final class WALSplitUtil {
// Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
// region's replayRecoveredEdits will not delete it
String fileName = formatRecoveredEditsFileName(seqId);
- fileName = getTmpRecoveredEditsFileName(fileName + "-" +
fileNameBeingSplit);
+ fileName =
+ getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit + "-" +
workerNameComponent);
return new Path(dir, fileName);
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index fec532f89da..73f41604361 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -62,6 +62,8 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
+import org.apache.hadoop.hbase.regionserver.LastSequenceId;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.security.User;
@@ -100,6 +102,7 @@ import
org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
/**
@@ -366,6 +369,70 @@ public class TestWALSplit {
}
}
+ // If another worker is assigned to split a WAl and last worker is still
running, both should not
+ // impact each other's progress
+ @Test
+ public void testTwoWorkerSplittingSameWAL() throws IOException,
InterruptedException {
+ int numWriter = 1, entries = 10;
+ generateWALs(numWriter, entries, -1, 0);
+ FileStatus logfile = fs.listStatus(WALDIR)[0];
+ FileSystem spiedFs = Mockito.spy(fs);
+ RegionServerServices zombieRSServices =
Mockito.mock(RegionServerServices.class);
+ RegionServerServices newWorkerRSServices =
Mockito.mock(RegionServerServices.class);
+ Mockito.when(zombieRSServices.getServerName())
+ .thenReturn(ServerName.valueOf("zombie-rs.abc.com,1234,1234567890"));
+ Mockito.when(newWorkerRSServices.getServerName())
+ .thenReturn(ServerName.valueOf("worker-rs.abc.com,1234,1234569870"));
+ Thread zombieWorker = new SplitWALWorker(logfile, spiedFs,
zombieRSServices);
+ Thread newWorker = new SplitWALWorker(logfile, spiedFs,
newWorkerRSServices);
+ zombieWorker.start();
+ newWorker.start();
+ newWorker.join();
+ zombieWorker.join();
+
+ for (String region : REGIONS) {
+ Path[] logfiles = getLogForRegion(TABLE_NAME, region);
+ assertEquals("wrong number of split files for region", numWriter,
logfiles.length);
+
+ int count = 0;
+ for (Path lf : logfiles) {
+ count += countWAL(lf);
+ }
+ assertEquals("wrong number of edits for region " + region, entries,
count);
+ }
+ }
+
+ private class SplitWALWorker extends Thread implements LastSequenceId {
+ final FileStatus logfile;
+ final FileSystem fs;
+ final RegionServerServices rsServices;
+
+ public SplitWALWorker(FileStatus logfile, FileSystem fs,
RegionServerServices rsServices) {
+ super(rsServices.getServerName().toShortString());
+ setDaemon(true);
+ this.fs = fs;
+ this.logfile = logfile;
+ this.rsServices = rsServices;
+ }
+
+ @Override
+ public void run() {
+ try {
+ boolean ret =
+ WALSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, null, this,
null, wals, rsServices);
+ assertTrue("Both splitting should pass", ret);
+ } catch (IOException e) {
+ LOG.warn(getName() + " Worker exiting " + e);
+ }
+ }
+
+ @Override
+ public ClusterStatusProtos.RegionStoreSequenceIds getLastSequenceId(byte[]
encodedRegionName) {
+ return ClusterStatusProtos.RegionStoreSequenceIds.newBuilder()
+ .setLastFlushedSequenceId(HConstants.NO_SEQNUM).build();
+ }
+ }
+
/**
* @see "https://issues.apache.org/jira/browse/HBASE-3020"
*/
@@ -397,7 +464,7 @@ public class TestWALSplit {
private Path createRecoveredEditsPathForRegion() throws IOException {
byte[] encoded =
RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME,
encoded, 1,
- FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
+ FILENAME_BEING_SPLIT, TMPDIRNAME, conf, "");
return p;
}