Repository: hbase
Updated Branches:
  refs/heads/master 6f8c7dca1 -> d2ba87509


HBASE-14949 Resolve name conflict when splitting if there are duplicated WAL 
entries


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d2ba8750
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d2ba8750
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d2ba8750

Branch: refs/heads/master
Commit: d2ba87509b8d193f58183beff4ab76c7edf47e11
Parents: 6f8c7dc
Author: zhangduo <zhang...@apache.org>
Authored: Thu Feb 18 10:31:01 2016 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Thu Feb 18 19:48:52 2016 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/wal/WALSplitter.java    |  88 +++++++++++-----
 .../hbase/regionserver/wal/TestWALReplay.java   | 105 ++++++++++++++++---
 2 files changed, 154 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d2ba8750/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 8abd950..54b82b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -176,6 +176,10 @@ public class WALSplitter {
   // Min batch size when replay WAL edits
   private final int minBatchSize;
 
+  // the file being split currently
+  private FileStatus fileBeingSplit;
+
+  @VisibleForTesting
   WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
       FileSystem fs, LastSequenceId idChecker,
       CoordinatedStateManager csm, RecoveryMode mode) {
@@ -267,6 +271,7 @@ public class WALSplitter {
    * log splitting implementation, splits one log file.
    * @param logfile should be an actual log file.
    */
+  @VisibleForTesting
   boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) 
throws IOException {
     Preconditions.checkState(status == null);
     Preconditions.checkArgument(logfile.isFile(),
@@ -285,6 +290,7 @@ public class WALSplitter {
         TaskMonitor.get().createStatus(
           "Splitting log file " + logfile.getPath() + "into a temporary 
staging area.");
     Reader in = null;
+    this.fileBeingSplit = logfile;
     try {
       long logLength = logfile.getLen();
       LOG.info("Splitting wal: " + logPath + ", length=" + logLength);
@@ -349,7 +355,7 @@ public class WALSplitter {
           }
           lastFlushedSequenceIds.put(encodedRegionNameAsStr, 
lastFlushedSequenceId);
         }
-        if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
+        if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) {
           editsSkipped++;
           continue;
         }
@@ -435,7 +441,7 @@ public class WALSplitter {
     finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
   }
 
-  static void finishSplitLogFile(Path rootdir, Path oldLogDir,
+  private static void finishSplitLogFile(Path rootdir, Path oldLogDir,
       Path logPath, Configuration conf) throws IOException {
     List<Path> processedLogs = new ArrayList<Path>();
     List<Path> corruptedLogs = new ArrayList<Path>();
@@ -509,12 +515,13 @@ public class WALSplitter {
    * @param fs
    * @param logEntry
    * @param rootDir HBase root dir.
+   * @param fileBeingSplit the file being split currently. Used to generate 
tmp file name.
    * @return Path to file into which to dump split log edits.
    * @throws IOException
    */
   @SuppressWarnings("deprecation")
-  static Path getRegionSplitEditsPath(final FileSystem fs,
-      final Entry logEntry, final Path rootDir, boolean isCreate)
+  private static Path getRegionSplitEditsPath(final FileSystem fs,
+      final Entry logEntry, final Path rootDir, FileStatus fileBeingSplit)
   throws IOException {
     Path tableDir = FSUtils.getTableDir(rootDir, 
logEntry.getKey().getTablename());
     String encodedRegionName = 
Bytes.toString(logEntry.getKey().getEncodedRegionName());
@@ -542,17 +549,18 @@ public class WALSplitter {
       }
     }
 
-    if (isCreate && !fs.exists(dir)) {
-      if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
+    if (!fs.exists(dir) && !fs.mkdirs(dir)) {
+      LOG.warn("mkdir failed on " + dir);
     }
+    // Append fileBeingSplit to prevent name conflict since we may have 
duplicate wal entries now.
     // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
     // region's replayRecoveredEdits will not delete it
-    String fileName = 
formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
-    fileName = getTmpRecoveredEditsFileName(fileName);
+    String fileName = 
formatRecoveredEditsFileName(logEntry.getKey().getSequenceId());
+    fileName = getTmpRecoveredEditsFileName(fileName + "-" + 
fileBeingSplit.getPath().getName());
     return new Path(dir, fileName);
   }
 
-  static String getTmpRecoveredEditsFileName(String fileName) {
+  private static String getTmpRecoveredEditsFileName(String fileName) {
     return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
   }
 
@@ -564,12 +572,13 @@ public class WALSplitter {
    * @param maximumEditLogSeqNum
    * @return dstPath take file's last edit log seq num as the name
    */
-  static Path getCompletedRecoveredEditsFilePath(Path srcPath,
-      Long maximumEditLogSeqNum) {
+  private static Path getCompletedRecoveredEditsFilePath(Path srcPath,
+      long maximumEditLogSeqNum) {
     String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
     return new Path(srcPath.getParent(), fileName);
   }
 
+  @VisibleForTesting
   static String formatRecoveredEditsFileName(final long seqid) {
     return String.format("%019d", seqid);
   }
@@ -1175,9 +1184,9 @@ public class WALSplitter {
       synchronized (regionMaximumEditLogSeqNum) {
         Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
             .getEncodedRegionName());
-        if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > 
currentMaxSeqNum) {
+        if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > 
currentMaxSeqNum) {
           
regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), 
entry.getKey()
-              .getLogSeqNum());
+              .getSequenceId());
         }
       }
     }
@@ -1296,6 +1305,39 @@ public class WALSplitter {
       return splits;
     }
 
+    // delete the one with fewer wal entries
+    private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws 
IOException {
+      long dstMinLogSeqNum = -1L;
+      try (WAL.Reader reader = walFactory.createReader(fs, dst)) {
+        WAL.Entry entry = reader.next();
+        if (entry != null) {
+          dstMinLogSeqNum = entry.getKey().getSequenceId();
+        }
+      } catch (EOFException e) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+            "Got EOF when reading first WAL entry from " + dst + ", an empty 
or broken WAL file?",
+            e);
+        }
+      }
+      if (wap.minLogSeqNum < dstMinLogSeqNum) {
+        LOG.warn("Found existing old edits file. It could be the result of a 
previous failed"
+            + " split attempt or we have duplicated wal entries. Deleting " + 
dst + ", length="
+            + fs.getFileStatus(dst).getLen());
+        if (!fs.delete(dst, false)) {
+          LOG.warn("Failed deleting of old " + dst);
+          throw new IOException("Failed deleting of old " + dst);
+        }
+      } else {
+        LOG.warn("Found existing old edits file and we have less entries. 
Deleting " + wap.p
+            + ", length=" + fs.getFileStatus(wap.p).getLen());
+        if (!fs.delete(wap.p, false)) {
+          LOG.warn("Failed deleting of " + wap.p);
+          throw new IOException("Failed deleting of " + wap.p);
+        }
+      }
+    }
+
     /**
      * Close all of the output streams.
      * @return the list of paths written.
@@ -1351,13 +1393,7 @@ public class WALSplitter {
               regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
             try {
               if (!dst.equals(wap.p) && fs.exists(dst)) {
-                LOG.warn("Found existing old edits file. It could be the "
-                    + "result of a previous failed split attempt. Deleting " + 
dst + ", length="
-                    + fs.getFileStatus(dst).getLen());
-                if (!fs.delete(dst, false)) {
-                  LOG.warn("Failed deleting of old " + dst);
-                  throw new IOException("Failed deleting of old " + dst);
-                }
+                deleteOneWithFewerEntries(wap, dst);
               }
               // Skip the unit tests which create a splitter that reads and
               // writes the data without touching disk.
@@ -1482,7 +1518,7 @@ public class WALSplitter {
      * @return a path with a write for that path. caller should close.
      */
     private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) 
throws IOException {
-      Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
+      Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, 
fileBeingSplit);
       if (regionedits == null) {
         return null;
       }
@@ -1496,7 +1532,7 @@ public class WALSplitter {
       }
       Writer w = createWriter(regionedits);
       LOG.debug("Creating writer path=" + regionedits);
-      return new WriterAndPath(regionedits, w);
+      return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
     }
 
     private void filterCellByStore(Entry logEntry) {
@@ -1516,7 +1552,7 @@ public class WALSplitter {
           Long maxSeqId = maxSeqIdInStores.get(family);
           // Do not skip cell even if maxSeqId is null. Maybe we are in a 
rolling upgrade,
           // or the master was crashed before and we can not get the 
information.
-          if (maxSeqId == null || maxSeqId.longValue() < 
logEntry.getKey().getLogSeqNum()) {
+          if (maxSeqId == null || maxSeqId.longValue() < 
logEntry.getKey().getSequenceId()) {
             keptCells.add(cell);
           }
         }
@@ -1623,10 +1659,12 @@ public class WALSplitter {
   private final static class WriterAndPath extends SinkWriter {
     final Path p;
     final Writer w;
+    final long minLogSeqNum;
 
-    WriterAndPath(final Path p, final Writer w) {
+    WriterAndPath(final Path p, final Writer w, final long minLogSeqNum) {
       this.p = p;
       this.w = w;
+      this.minLogSeqNum = minLogSeqNum;
     }
   }
 
@@ -1819,7 +1857,7 @@ public class WALSplitter {
             }
             if (maxStoreSequenceIds != null) {
               Long maxStoreSeqId = maxStoreSequenceIds.get(family);
-              if (maxStoreSeqId == null || maxStoreSeqId >= 
entry.getKey().getLogSeqNum()) {
+              if (maxStoreSeqId == null || maxStoreSeqId >= 
entry.getKey().getSequenceId()) {
                 // skip current kv if column family doesn't exist anymore or 
already flushed
                 skippedCells.add(cell);
                 continue;

http://git-wip-us.apache.org/repos/asf/hbase/blob/d2ba8750/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index 40e5baa..dbc06ff 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
@@ -1034,6 +1035,56 @@ public class TestWALReplay {
     assertEquals(result.size(), region2.get(g).size());
   }
 
+  /**
+   * testcase for https://issues.apache.org/jira/browse/HBASE-14949.
+   */
+  private void testNameConflictWhenSplit(boolean largeFirst) throws 
IOException {
+    final TableName tableName = 
TableName.valueOf("testReplayEditsWrittenIntoWAL");
+    final MultiVersionConcurrencyControl mvcc = new 
MultiVersionConcurrencyControl();
+    final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+    final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
+    deleteDir(basedir);
+
+    final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
+    HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, 
this.conf, htd);
+    HBaseTestingUtility.closeRegionAndWAL(region);
+    final byte[] family = htd.getColumnFamilies()[0].getName();
+    final byte[] rowName = tableName.getName();
+    FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, 
mvcc, 1);
+    FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, 
mvcc, 2);
+
+    Path largeFile = new Path(logDir, "wal-1");
+    Path smallFile = new Path(logDir, "wal-2");
+    writerWALFile(largeFile, Arrays.asList(entry1, entry2));
+    writerWALFile(smallFile, Arrays.asList(entry2));
+    FileStatus first, second;
+    if (largeFirst) {
+      first = fs.getFileStatus(largeFile);
+      second = fs.getFileStatus(smallFile);
+    } else {
+      first = fs.getFileStatus(smallFile);
+      second = fs.getFileStatus(largeFile);
+    }
+    WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null,
+      RecoveryMode.LOG_SPLITTING, wals);
+    WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null,
+      RecoveryMode.LOG_SPLITTING, wals);
+    WAL wal = createWAL(this.conf);
+    region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);
+    assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint());
+    assertEquals(2, region.get(new Get(rowName)).size());
+  }
+
+  @Test
+  public void testNameConflictWhenSplit0() throws IOException {
+    testNameConflictWhenSplit(true);
+  }
+
+  @Test
+  public void testNameConflictWhenSplit1() throws IOException {
+    testNameConflictWhenSplit(false);
+  }
+
   static class MockWAL extends FSHLog {
     boolean doCompleteCacheFlush = false;
 
@@ -1102,27 +1153,42 @@ public class TestWALReplay {
     }
   }
 
+  private WALKey createWALKey(final TableName tableName, final HRegionInfo hri,
+      final MultiVersionConcurrencyControl mvcc) {
+    return new WALKey(hri.getEncodedNameAsBytes(), tableName, 999, mvcc);
+  }
+
+  private WALEdit createWALEdit(final byte[] rowName, final byte[] family, 
EnvironmentEdge ee,
+      int index) {
+    byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index));
+    byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + 
Integer.toString(index));
+    WALEdit edit = new WALEdit();
+    edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), 
columnBytes));
+    return edit;
+  }
+
+  private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, 
long sequence,
+      byte[] rowName, byte[] family, EnvironmentEdge ee, 
MultiVersionConcurrencyControl mvcc,
+      int index) throws IOException {
+    FSWALEntry entry =
+        new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc), 
createWALEdit(
+          rowName, family, ee, index), htd, hri, true);
+    entry.stampRegionSequenceId();
+    return entry;
+  }
+
   private void addWALEdits(final TableName tableName, final HRegionInfo hri, 
final byte[] rowName,
       final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
-      final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc)
-  throws IOException {
-    String familyStr = Bytes.toString(family);
+      final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) 
throws IOException {
     for (int j = 0; j < count; j++) {
-      byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
-      byte[] columnBytes = Bytes.toBytes(familyStr + ":" + 
Integer.toString(j));
-      WALEdit edit = new WALEdit();
-      edit.add(new KeyValue(rowName, family, qualifierBytes,
-        ee.currentTime(), columnBytes));
-      wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), 
tableName,999, mvcc),
-          edit, true);
+      wal.append(htd, hri, createWALKey(tableName, hri, mvcc),
+        createWALEdit(rowName, family, ee, j), true);
     }
     wal.sync();
   }
 
-  static List<Put> addRegionEdits (final byte [] rowName, final byte [] family,
-      final int count, EnvironmentEdge ee, final Region r,
-      final String qualifierPrefix)
-  throws IOException {
+  static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, 
final int count,
+      EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws 
IOException {
     List<Put> puts = new ArrayList<Put>();
     for (int j = 0; j < count; j++) {
       byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
@@ -1183,4 +1249,15 @@ public class TestWALReplay {
     htd.addFamily(c);
     return htd;
   }
+
+  private void writerWALFile(Path file, List<FSWALEntry> entries) throws 
IOException {
+    fs.mkdirs(file.getParent());
+    ProtobufLogWriter writer = new ProtobufLogWriter();
+    writer.init(fs, file, conf, true);
+    for (FSWALEntry entry : entries) {
+      writer.append(entry);
+    }
+    writer.sync();
+    writer.close();
+  }
 }

Reply via email to