This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new f3c0ce02312 HBASE-28951 Handle simultaneous WAL splitting to recovered 
edits by multiple worker (#7228) (#7075)
f3c0ce02312 is described below

commit f3c0ce02312d7161160d6f8c9cd7aadeb710d3c0
Author: Umesh <[email protected]>
AuthorDate: Wed Aug 20 07:40:47 2025 +0530

    HBASE-28951 Handle simultaneous WAL splitting to recovered edits by 
multiple worker (#7228) (#7075)
    
    Signed-off-by: Andrew Purtell <[email protected]>
    Signed-off-by: Duo Zhang <[email protected]>
    Signed-off-by: Aman Poonia <[email protected]>
---
 .../wal/AbstractRecoveredEditsOutputSink.java      | 100 ++++++++++++++++-----
 .../org/apache/hadoop/hbase/wal/WALSplitUtil.java  |  19 ++--
 .../org/apache/hadoop/hbase/wal/TestWALSplit.java  |  69 +++++++++++++-
 3 files changed, 156 insertions(+), 32 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
index 0411b1b76bd..84021569272 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
@@ -22,6 +22,9 @@ import static 
org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -30,8 +33,10 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -45,6 +50,7 @@ abstract class AbstractRecoveredEditsOutputSink extends 
OutputSink {
   private static final Logger LOG = 
LoggerFactory.getLogger(RecoveredEditsOutputSink.class);
   private final WALSplitter walSplitter;
   private final ConcurrentMap<String, Long> regionMaximumEditLogSeqNum = new 
ConcurrentHashMap<>();
+  private static final int MAX_RENAME_RETRY_COUNT = 5;
 
   public AbstractRecoveredEditsOutputSink(WALSplitter walSplitter,
     WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int 
numWriters) {
@@ -55,9 +61,12 @@ abstract class AbstractRecoveredEditsOutputSink extends 
OutputSink {
   /** Returns a writer that wraps a {@link WALProvider.Writer} and its Path. 
Caller should close. */
   protected RecoveredEditsWriter createRecoveredEditsWriter(TableName 
tableName, byte[] region,
     long seqId) throws IOException {
+    // If multiple worker are splitting a WAL at a same time, both should use 
unique file name to
+    // avoid conflict
     Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId,
       walSplitter.getFileBeingSplit().getPath().getName(), 
walSplitter.getTmpDirName(),
-      walSplitter.conf);
+      walSplitter.conf, getWorkerNameComponent());
+
     if (walSplitter.walFS.exists(regionEditsPath)) {
       LOG.warn("Found old edits file. It could be the "
         + "result of a previous failed split attempt. Deleting " + 
regionEditsPath + ", length="
@@ -73,6 +82,20 @@ abstract class AbstractRecoveredEditsOutputSink extends 
OutputSink {
     return new RecoveredEditsWriter(region, regionEditsPath, w, seqId);
   }
 
+  private String getWorkerNameComponent() {
+    if (walSplitter.rsServices == null) {
+      return "";
+    }
+    try {
+      return URLEncoder.encode(
+        walSplitter.rsServices.getServerName().toShortString()
+          .replace(Addressing.HOSTNAME_PORT_SEPARATOR, 
ServerName.SERVERNAME_SEPARATOR),
+        StandardCharsets.UTF_8.name());
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("URLEncoder doesn't support UTF-8", e);
+    }
+  }
+
   /**
    * abortRecoveredEditsWriter closes the editsWriter, but does not rename and 
finalize the
    * recovered edits WAL files. Please see HBASE-28569.
@@ -103,22 +126,40 @@ abstract class AbstractRecoveredEditsOutputSink extends 
OutputSink {
     Path dst = getCompletedRecoveredEditsFilePath(editsWriter.path,
       
regionMaximumEditLogSeqNum.get(Bytes.toString(editsWriter.encodedRegionName)));
     try {
-      if (!dst.equals(editsWriter.path) && walSplitter.walFS.exists(dst)) {
-        deleteOneWithFewerEntries(editsWriter, dst);
-      }
       // Skip the unit tests which create a splitter that reads and
       // writes the data without touching disk.
       // TestHLogSplit#testThreading is an example.
       if (walSplitter.walFS.exists(editsWriter.path)) {
-        if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
-          final String errorMsg =
-            "Failed renaming recovered edits " + editsWriter.path + " to " + 
dst;
+        boolean retry;
+        int retryCount = 0;
+        do {
+          retry = false;
+          retryCount++;
+          // If rename is successful, it means recovered edits are 
successfully places at right
+          // place but if rename fails, there can be below reasons
+          // 1. dst already exist - in this case if dst have desired edits we 
will keep the dst,
+          // delete the editsWriter.path and consider this success else if dst 
have fewer edits, we
+          // will delete the dst and retry the rename
+          // 2. parent directory does not exit - in one edge case this is 
possible when this worker
+          // got stuck before rename and HMaster get another worker to split 
the wal, SCP will
+          // proceed and once region get opened on one RS, we delete the 
recovered.edits directory,
+          // in this case there is no harm in failing this procedure after 
retry exhausted.
+          if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
+            retry = deleteOneWithFewerEntriesToRetry(editsWriter, dst);
+          }
+        } while (retry && retryCount < MAX_RENAME_RETRY_COUNT);
+
+        // If we are out of loop with retry flag `true` it means we have 
exhausted the retries.
+        if (retry) {
+          final String errorMsg = "Failed renaming recovered edits " + 
editsWriter.path + " to "
+            + dst + " in " + MAX_RENAME_RETRY_COUNT + " retries";
           updateStatusWithMsg(errorMsg);
           throw new IOException(errorMsg);
+        } else {
+          final String renameEditMsg = "Rename recovered edits " + 
editsWriter.path + " to " + dst;
+          LOG.info(renameEditMsg);
+          updateStatusWithMsg(renameEditMsg);
         }
-        final String renameEditMsg = "Rename recovered edits " + 
editsWriter.path + " to " + dst;
-        LOG.info(renameEditMsg);
-        updateStatusWithMsg(renameEditMsg);
       }
     } catch (IOException ioe) {
       final String errorMsg = "Could not rename recovered edits " + 
editsWriter.path + " to " + dst;
@@ -186,35 +227,48 @@ abstract class AbstractRecoveredEditsOutputSink extends 
OutputSink {
   }
 
   // delete the one with fewer wal entries
-  private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter, 
Path dst)
+  private boolean deleteOneWithFewerEntriesToRetry(RecoveredEditsWriter 
editsWriter, Path dst)
     throws IOException {
-    long dstMinLogSeqNum = -1L;
-    try (WAL.Reader reader = 
walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) {
-      WAL.Entry entry = reader.next();
-      if (entry != null) {
-        dstMinLogSeqNum = entry.getKey().getSequenceId();
-      }
-    } catch (EOFException e) {
-      LOG.debug("Got EOF when reading first WAL entry from {}, an empty or 
broken WAL file?", dst,
-        e);
+    if (!walSplitter.walFS.exists(dst)) {
+      LOG.info("dst {} doesn't exist, need to retry ", dst);
+      return true;
     }
-    if (editsWriter.minLogSeqNum < dstMinLogSeqNum) {
+
+    if (isDstHasFewerEntries(editsWriter, dst)) {
       LOG.warn("Found existing old edits file. It could be the result of a 
previous failed"
         + " split attempt or we have duplicated wal entries. Deleting " + dst 
+ ", length="
-        + walSplitter.walFS.getFileStatus(dst).getLen());
+        + walSplitter.walFS.getFileStatus(dst).getLen() + " and retry is 
needed");
       if (!walSplitter.walFS.delete(dst, false)) {
         LOG.warn("Failed deleting of old {}", dst);
         throw new IOException("Failed deleting of old " + dst);
       }
+      return true;
     } else {
       LOG
         .warn("Found existing old edits file and we have less entries. 
Deleting " + editsWriter.path
-          + ", length=" + 
walSplitter.walFS.getFileStatus(editsWriter.path).getLen());
+          + ", length=" + 
walSplitter.walFS.getFileStatus(editsWriter.path).getLen()
+          + " and no retry needed as dst has all edits");
       if (!walSplitter.walFS.delete(editsWriter.path, false)) {
         LOG.warn("Failed deleting of {}", editsWriter.path);
         throw new IOException("Failed deleting of " + editsWriter.path);
       }
+      return false;
+    }
+  }
+
+  private boolean isDstHasFewerEntries(RecoveredEditsWriter editsWriter, Path 
dst)
+    throws IOException {
+    long dstMinLogSeqNum = -1L;
+    try (WAL.Reader reader = 
walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) {
+      WAL.Entry entry = reader.next();
+      if (entry != null) {
+        dstMinLogSeqNum = entry.getKey().getSequenceId();
+      }
+    } catch (EOFException e) {
+      LOG.debug("Got EOF when reading first WAL entry from {}, an empty or 
broken WAL file?", dst,
+        e);
     }
+    return editsWriter.minLogSeqNum < dstMinLogSeqNum;
   }
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
index 218777a55f6..51cf6742b80 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
@@ -150,17 +150,19 @@ public final class WALSplitUtil {
    * /hbase/some_table/2323432434/recovered.edits/2332. This method also 
ensures existence of
    * RECOVERED_EDITS_DIR under the region creating it if necessary. And also 
set storage policy for
    * RECOVERED_EDITS_DIR if WAL_STORAGE_POLICY is configured.
-   * @param tableName          the table name
-   * @param encodedRegionName  the encoded region name
-   * @param seqId              the sequence id which used to generate file name
-   * @param fileNameBeingSplit the file being split currently. Used to 
generate tmp file name.
-   * @param tmpDirName         of the directory used to sideline old recovered 
edits file
-   * @param conf               configuration
+   * @param tableName           the table name
+   * @param encodedRegionName   the encoded region name
+   * @param seqId               the sequence id which used to generate file 
name
+   * @param fileNameBeingSplit  the file being split currently. Used to 
generate tmp file name.
+   * @param tmpDirName          of the directory used to sideline old 
recovered edits file
+   * @param conf                configuration
+   * @param workerNameComponent the worker name component for the file name
    * @return Path to file into which to dump split log edits.
    */
   @SuppressWarnings("deprecation")
   static Path getRegionSplitEditsPath(TableName tableName, byte[] 
encodedRegionName, long seqId,
-    String fileNameBeingSplit, String tmpDirName, Configuration conf) throws 
IOException {
+    String fileNameBeingSplit, String tmpDirName, Configuration conf, String 
workerNameComponent)
+    throws IOException {
     FileSystem walFS = CommonFSUtils.getWALFileSystem(conf);
     Path tableDir = CommonFSUtils.getWALTableDir(conf, tableName);
     String encodedRegionNameStr = Bytes.toString(encodedRegionName);
@@ -192,7 +194,8 @@ public final class WALSplitUtil {
     // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
     // region's replayRecoveredEdits will not delete it
     String fileName = formatRecoveredEditsFileName(seqId);
-    fileName = getTmpRecoveredEditsFileName(fileName + "-" + 
fileNameBeingSplit);
+    fileName =
+      getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit + "-" + 
workerNameComponent);
     return new Path(dir, fileName);
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index fec532f89da..73f41604361 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -62,6 +62,8 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
+import org.apache.hadoop.hbase.regionserver.LastSequenceId;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
 import org.apache.hadoop.hbase.security.User;
@@ -100,6 +102,7 @@ import 
org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 
 /**
@@ -366,6 +369,70 @@ public class TestWALSplit {
     }
   }
 
+  // If another worker is assigned to split a WAl and last worker is still 
running, both should not
+  // impact each other's progress
+  @Test
+  public void testTwoWorkerSplittingSameWAL() throws IOException, 
InterruptedException {
+    int numWriter = 1, entries = 10;
+    generateWALs(numWriter, entries, -1, 0);
+    FileStatus logfile = fs.listStatus(WALDIR)[0];
+    FileSystem spiedFs = Mockito.spy(fs);
+    RegionServerServices zombieRSServices = 
Mockito.mock(RegionServerServices.class);
+    RegionServerServices newWorkerRSServices = 
Mockito.mock(RegionServerServices.class);
+    Mockito.when(zombieRSServices.getServerName())
+      .thenReturn(ServerName.valueOf("zombie-rs.abc.com,1234,1234567890"));
+    Mockito.when(newWorkerRSServices.getServerName())
+      .thenReturn(ServerName.valueOf("worker-rs.abc.com,1234,1234569870"));
+    Thread zombieWorker = new SplitWALWorker(logfile, spiedFs, 
zombieRSServices);
+    Thread newWorker = new SplitWALWorker(logfile, spiedFs, 
newWorkerRSServices);
+    zombieWorker.start();
+    newWorker.start();
+    newWorker.join();
+    zombieWorker.join();
+
+    for (String region : REGIONS) {
+      Path[] logfiles = getLogForRegion(TABLE_NAME, region);
+      assertEquals("wrong number of split files for region", numWriter, 
logfiles.length);
+
+      int count = 0;
+      for (Path lf : logfiles) {
+        count += countWAL(lf);
+      }
+      assertEquals("wrong number of edits for region " + region, entries, 
count);
+    }
+  }
+
+  private class SplitWALWorker extends Thread implements LastSequenceId {
+    final FileStatus logfile;
+    final FileSystem fs;
+    final RegionServerServices rsServices;
+
+    public SplitWALWorker(FileStatus logfile, FileSystem fs, 
RegionServerServices rsServices) {
+      super(rsServices.getServerName().toShortString());
+      setDaemon(true);
+      this.fs = fs;
+      this.logfile = logfile;
+      this.rsServices = rsServices;
+    }
+
+    @Override
+    public void run() {
+      try {
+        boolean ret =
+          WALSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, null, this, 
null, wals, rsServices);
+        assertTrue("Both splitting should pass", ret);
+      } catch (IOException e) {
+        LOG.warn(getName() + " Worker exiting " + e);
+      }
+    }
+
+    @Override
+    public ClusterStatusProtos.RegionStoreSequenceIds getLastSequenceId(byte[] 
encodedRegionName) {
+      return ClusterStatusProtos.RegionStoreSequenceIds.newBuilder()
+        .setLastFlushedSequenceId(HConstants.NO_SEQNUM).build();
+    }
+  }
+
   /**
    * @see "https://issues.apache.org/jira/browse/HBASE-3020";
    */
@@ -397,7 +464,7 @@ public class TestWALSplit {
   private Path createRecoveredEditsPathForRegion() throws IOException {
     byte[] encoded = 
RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
     Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, 
encoded, 1,
-      FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
+      FILENAME_BEING_SPLIT, TMPDIRNAME, conf, "");
     return p;
   }
 

Reply via email to