Repository: hbase Updated Branches: refs/heads/branch-1.1 3203e2724 -> f4cbf77f2
HBASE-16931 Setting cell's seqId to zero in compaction flow might cause RS down. Signed-off-by: Yu Li <l...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f4cbf77f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f4cbf77f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f4cbf77f Branch: refs/heads/branch-1.1 Commit: f4cbf77f28a2668743b1c73b5aeb113c3456fb13 Parents: 3203e27 Author: binlijin <binli...@gmail.com> Authored: Mon Oct 24 18:08:23 2016 +0800 Committer: Yu Li <l...@apache.org> Committed: Tue Oct 25 20:04:04 2016 +0800 ---------------------------------------------------------------------- .../regionserver/compactions/Compactor.java | 11 +++ .../hbase/regionserver/TestCompaction.java | 78 ++++++++++++++++++++ 2 files changed, 89 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/f4cbf77f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index d2227ab..f126a8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -314,9 +314,16 @@ public abstract class Compactor { now = EnvironmentEdgeManager.currentTime(); } // output to writer: + Cell lastCleanCell = null; + long lastCleanCellSeqId = 0; for (Cell c : cells) { if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { + lastCleanCell = c; + lastCleanCellSeqId = c.getSequenceId(); CellUtil.setSequenceId(c, 0); + } else { + lastCleanCell = null; + lastCleanCellSeqId = 0; } writer.append(c); int len = KeyValueUtil.length(c); @@ -338,6 +345,10 @@ public abstract class Compactor { } } } + if (lastCleanCell != null) { + // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly + CellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId); + } // Log the progress of long running compactions every minute if // logging at DEBUG level if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/f4cbf77f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 01c7848..3ce287b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.security.User; @@ -67,6 +68,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; import org.junit.Assume; import org.junit.Before; @@ -97,6 +99,7 @@ public class TestCompaction { private int compactionThreshold; private byte[] secondRowBytes, thirdRowBytes; private static final long MAX_FILES_TO_COMPACT = 10; + private final byte[] FAMILY = Bytes.toBytes("cf"); /** constructor */ public TestCompaction() { @@ -119,6 +122,15 @@ public class TestCompaction { @Before public void setUp() throws Exception { this.htd = UTIL.createTableDescriptor(name.getMethodName()); + if (name.getMethodName().equals("testCompactionSeqId")) { + UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10"); + UTIL.getConfiguration().set( + DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, + DummyCompactor.class.getName()); + HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); + hcd.setMaxVersions(65536); + this.htd.addFamily(hcd); + } this.r = UTIL.createLocalHRegion(htd, null, null); } @@ -569,6 +581,72 @@ public class TestCompaction { cst.interruptIfNecessary(); } + /** + * Firstly write 10 cells (with different time stamp) to a qualifier and flush + * to hfile1, then write 10 cells (with different time stamp) to the same + * qualifier and flush to hfile2. The latest cell (cell-A) in hfile1 and the + * oldest cell (cell-B) in hfile2 are with the same time stamp but different + * sequence id, and will get scanned successively during compaction. + * <p/> + * We set compaction.kv.max to 10 so compaction will scan 10 versions each + * round, meanwhile we set keepSeqIdPeriod=0 in {@link DummyCompactor} so all + * 10 versions of hfile2 will be written out with seqId cleaned (set to 0) + * including cell-B, then when scanner goes to cell-A it will cause a scan + * out-of-order assertion error before HBASE-16931 + * + * @throws Exception + * if error occurs during the test + */ + @Test + public void testCompactionSeqId() throws Exception { + final byte[] ROW = Bytes.toBytes("row"); + final byte[] QUALIFIER = Bytes.toBytes("qualifier"); + + long timestamp = 10000; + + // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9 + // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8 + // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7 + // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6 + // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5 + // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4 + // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3 + // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2 + // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1 + // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0 + for (int i = 0; i < 10; i++) { + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); + r.put(put); + } + r.flush(true); + + // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18 + // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17 + // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16 + // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15 + // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14 + // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13 + // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12 + // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11 + // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10 + // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9 + for (int i = 18; i > 8; i--) { + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i)); + r.put(put); + } + r.flush(true); + r.compact(true); + } + + public static class DummyCompactor extends DefaultCompactor { + public DummyCompactor(Configuration conf, Store store) { + super(conf, store); + this.keepSeqIdPeriod = 0; + } + } + private static StoreFile createFile() throws Exception { StoreFile sf = mock(StoreFile.class); when(sf.getPath()).thenReturn(new Path("file"));