Repository: hbase Updated Branches: refs/heads/branch-1 d34e9c5c5 -> 2baf3bfc9
http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index c55280b..7fbb285 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -124,6 +124,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.protobuf.ServiceException; +import com.google.protobuf.TextFormat; /** * This class is responsible for splitting up a bunch of regionserver commit log @@ -324,15 +325,19 @@ public class WALSplitter { failedServerName = (serverName == null) ? "" : serverName.getServerName(); while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) { byte[] region = entry.getKey().getEncodedRegionName(); - String key = Bytes.toString(region); - lastFlushedSequenceId = lastFlushedSequenceIds.get(key); + String encodedRegionNameAsStr = Bytes.toString(region); + lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); if (lastFlushedSequenceId == null) { if (this.distributedLogReplay) { RegionStoreSequenceIds ids = csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName, - key); + encodedRegionNameAsStr); if (ids != null) { lastFlushedSequenceId = ids.getLastFlushedSequenceId(); + if (LOG.isDebugEnabled()) { + LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": " + + TextFormat.shortDebugString(ids)); + } } } else if (sequenceIdChecker != null) { RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region); @@ -341,13 +346,17 @@ public class WALSplitter { maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(), storeSeqId.getSequenceId()); } - regionMaxSeqIdInStores.put(key, maxSeqIdInStores); + regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores); lastFlushedSequenceId = ids.getLastFlushedSequenceId(); + if (LOG.isDebugEnabled()) { + LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " + + TextFormat.shortDebugString(ids)); + } } if (lastFlushedSequenceId == null) { lastFlushedSequenceId = -1L; } - lastFlushedSequenceIds.put(key, lastFlushedSequenceId); + lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId); } if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) { editsSkipped++; @@ -1071,7 +1080,7 @@ public class WALSplitter { } private void doRun() throws IOException { - LOG.debug("Writer thread " + this + ": starting"); + if (LOG.isTraceEnabled()) LOG.trace("Writer thread starting"); while (true) { RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); if (buffer == null) { @@ -1226,7 +1235,8 @@ public class WALSplitter { } } controller.checkForErrors(); - LOG.info("Split writers finished"); + LOG.info((this.writerThreads == null? 0: this.writerThreads.size()) + + " split writers finished; closing..."); return (!progress_failed); } @@ -1317,12 +1327,14 @@ public class WALSplitter { CompletionService<Void> completionService = new ExecutorCompletionService<Void>(closeThreadPool); for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) { - LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p); + if (LOG.isTraceEnabled()) { + LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p); + } completionService.submit(new Callable<Void>() { @Override public Void call() throws Exception { WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); - LOG.debug("Closing " + wap.p); + if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p); try { wap.w.close(); } catch (IOException ioe) { @@ -1330,8 +1342,8 @@ public class WALSplitter { thrown.add(ioe); return null; } - LOG.info("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in " - + (wap.nanosSpent / 1000 / 1000) + "ms)"); + LOG.info("Closed " + wap.p + "; wrote " + wap.editsWritten + " edit(s) in " + + (wap.nanosSpent / 1000 / 1000) + "ms"); if (wap.editsWritten == 0) { // just remove the empty recovered.edits file @@ -1490,8 +1502,8 @@ public class WALSplitter { } } Writer w = createWriter(regionedits); - LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region)); - return (new WriterAndPath(regionedits, w)); + LOG.debug("Creating writer path=" + regionedits); + return new WriterAndPath(regionedits, w); } private void filterCellByStore(Entry logEntry) { @@ -1505,6 +1517,7 @@ public class WALSplitter { if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { byte[] family = CellUtil.cloneFamily(cell); Long maxSeqId = maxSeqIdInStores.get(family); + LOG.info("CHANGE REMOVE " + Bytes.toString(family) + ", max=" + maxSeqId); // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, // or the master was crashed before and we can not get the information. if (maxSeqId != null && maxSeqId.longValue() >= logEntry.getKey().getLogSeqNum()) { @@ -1544,9 +1557,9 @@ public class WALSplitter { filterCellByStore(logEntry); if (!logEntry.getEdit().isEmpty()) { wap.w.append(logEntry); + this.updateRegionMaximumEditLogSeqNum(logEntry); + editsCount++; } - this.updateRegionMaximumEditLogSeqNum(logEntry); - editsCount++; } // Pass along summary statistics wap.incrementEdits(editsCount); http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java index 9a9d784..b328e57 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.ipc; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.security.UserProvider; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java index 5e6bff8..abb6520 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java @@ -91,6 +91,7 @@ public class TestGetLastFlushedSequenceId { testUtil.getHBaseCluster().getMaster() .getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes()); assertEquals(HConstants.NO_SEQNUM, ids.getLastFlushedSequenceId()); + // This will be the sequenceid just before that of the earliest edit in memstore. long storeSequenceId = ids.getStoreSequenceId(0).getSequenceId(); assertTrue(storeSequenceId > 0); testUtil.getHBaseAdmin().flush(tableName); http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index b61416c..6b342d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -231,6 +231,35 @@ public class TestHRegion { } /** + * Test that I can use the max flushed sequence id after the close. + * @throws IOException + */ + @Test (timeout = 100000) + public void testSequenceId() throws IOException { + HRegion region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES); + assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId()); + // Weird. This returns 0 if no store files or no edits. Afraid to change it. + assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES)); + region.close(); + assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId()); + assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES)); + // Open region again. + region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES); + byte [] value = Bytes.toBytes(name.getMethodName()); + // Make a random put against our cf. + Put put = new Put(value); + put.addColumn(COLUMN_FAMILY_BYTES, null, value); + region.put(put); + // No flush yet so init numbers should still be in place. + assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId()); + assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES)); + region.flush(true); + long max = region.getMaxFlushedSeqId(); + region.close(); + assertEquals(max, region.getMaxFlushedSeqId()); + } + + /** * Test for Bug 2 of HBASE-10466. * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize * is smaller than a certain value, or when region close starts a flush is ongoing, the first http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java new file mode 100644 index 0000000..92e0558 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.hadoop.hbase.DroppedSnapshotException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; +import org.apache.hadoop.hbase.regionserver.Region.FlushResult; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.mortbay.log.Log; + +/** + * Testcase for https://issues.apache.org/jira/browse/HBASE-13811 + */ +@Category({ MediumTests.class }) +public class TestSplitWalDataLoss { + + private final HBaseTestingUtility testUtil = new HBaseTestingUtility(); + + private NamespaceDescriptor namespace = NamespaceDescriptor.create(getClass().getSimpleName()) + .build(); + + private TableName tableName = TableName.valueOf(namespace.getName(), "dataloss"); + + private byte[] family = Bytes.toBytes("f"); + + private byte[] qualifier = Bytes.toBytes("q"); + + @Before + public void setUp() throws Exception { + testUtil.getConfiguration().setInt("hbase.regionserver.msginterval", 30000); + testUtil.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false); + testUtil.startMiniCluster(2); + HBaseAdmin admin = testUtil.getHBaseAdmin(); + admin.createNamespace(namespace); + admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(family))); + testUtil.waitTableAvailable(tableName); + } + + @After + public void tearDown() throws Exception { + testUtil.shutdownMiniCluster(); + } + + @Test + public void test() throws IOException, InterruptedException { + final HRegionServer rs = testUtil.getRSForFirstRegionInTable(tableName); + final HRegion region = (HRegion) rs.getOnlineRegions(tableName).get(0); + HRegion spiedRegion = spy(region); + final MutableBoolean flushed = new MutableBoolean(false); + final MutableBoolean reported = new MutableBoolean(false); + doAnswer(new Answer<FlushResult>() { + @Override + public FlushResult answer(InvocationOnMock invocation) throws Throwable { + synchronized (flushed) { + flushed.setValue(true); + flushed.notifyAll(); + } + synchronized (reported) { + while (!reported.booleanValue()) { + reported.wait(); + } + } + rs.getWAL(region.getRegionInfo()).abortCacheFlush( + region.getRegionInfo().getEncodedNameAsBytes()); + throw new DroppedSnapshotException("testcase"); + } + }).when(spiedRegion).internalFlushCacheAndCommit(Matchers.<WAL> any(), + Matchers.<MonitoredTask> any(), Matchers.<PrepareFlushResult> any(), + Matchers.<Collection<Store>> any()); + rs.onlineRegions.put(rs.onlineRegions.keySet().iterator().next(), spiedRegion); + Connection conn = testUtil.getConnection(); + + try (Table table = conn.getTable(tableName)) { + table.put(new Put(Bytes.toBytes("row0")).addColumn(family, qualifier, Bytes.toBytes("val0"))); + } + long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family); + Log.info("CHANGE OLDEST " + oldestSeqIdOfStore); + assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM); + rs.cacheFlusher.requestFlush(spiedRegion, false); + synchronized (flushed) { + while (!flushed.booleanValue()) { + flushed.wait(); + } + } + try (Table table = conn.getTable(tableName)) { + table.put(new Put(Bytes.toBytes("row1")).addColumn(family, qualifier, Bytes.toBytes("val1"))); + } + long now = EnvironmentEdgeManager.currentTime(); + rs.tryRegionServerReport(now - 500, now); + synchronized (reported) { + reported.setValue(true); + reported.notifyAll(); + } + while (testUtil.getRSForFirstRegionInTable(tableName) == rs) { + Thread.sleep(100); + } + try (Table table = conn.getTable(tableName)) { + Result result = table.get(new Get(Bytes.toBytes("row0"))); + assertArrayEquals(Bytes.toBytes("val0"), result.getValue(family, qualifier)); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index cc5191c..b3b520a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -27,9 +26,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; @@ -50,7 +47,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -58,6 +54,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -320,53 +317,6 @@ public class TestFSHLog { } } - /** - * Simulates WAL append ops for a region and tests - * {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} API. - * It compares the region sequenceIds with oldestFlushing and oldestUnFlushed entries. - * If a region's entries are larger than min of (oldestFlushing, oldestUnFlushed), then the - * region should be flushed before archiving this WAL. - */ - @Test - public void testAllRegionsFlushed() { - LOG.debug("testAllRegionsFlushed"); - Map<byte[], Long> oldestFlushingSeqNo = new HashMap<byte[], Long>(); - Map<byte[], Long> oldestUnFlushedSeqNo = new HashMap<byte[], Long>(); - Map<byte[], Long> seqNo = new HashMap<byte[], Long>(); - // create a table - TableName t1 = TableName.valueOf("t1"); - // create a region - HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); - // variables to mock region sequenceIds - final AtomicLong sequenceId1 = new AtomicLong(1); - // test empty map - assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); - // add entries in the region - seqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.incrementAndGet()); - oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get()); - // should say region1 is not flushed. - assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); - // test with entries in oldestFlushing map. - oldestUnFlushedSeqNo.clear(); - oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get()); - assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); - // simulate region flush, i.e., clear oldestFlushing and oldestUnflushed maps - oldestFlushingSeqNo.clear(); - oldestUnFlushedSeqNo.clear(); - assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); - // insert some large values for region1 - oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), 1000l); - seqNo.put(hri1.getEncodedNameAsBytes(), 1500l); - assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); - - // tests when oldestUnFlushed/oldestFlushing contains larger value. - // It means region is flushed. - oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), 1200l); - oldestUnFlushedSeqNo.clear(); - seqNo.put(hri1.getEncodedNameAsBytes(), 1199l); - assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo)); - } - @Test(expected=IOException.class) public void testFailedToCreateWALIfParentRenamed() throws IOException { final String name = "testFailedToCreateWALIfParentRenamed"; http://git-wip-us.apache.org/repos/asf/hbase/blob/2baf3bfc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java new file mode 100644 index 0000000..9fd0cb1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestSequenceIdAccounting { + private static final byte [] ENCODED_REGION_NAME = Bytes.toBytes("r"); + private static final byte [] FAMILY_NAME = Bytes.toBytes("cf"); + private static final Set<byte[]> FAMILIES; + static { + FAMILIES = new HashSet<byte[]>(); + FAMILIES.add(FAMILY_NAME); + } + + @Test + public void testStartCacheFlush() { + SequenceIdAccounting sida = new SequenceIdAccounting(); + sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME); + Map<byte[], Long> m = new HashMap<byte[], Long>(); + m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); + assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES)); + sida.completeCacheFlush(ENCODED_REGION_NAME); + long sequenceid = 1; + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true); + // Only one family so should return NO_SEQNUM still. + assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES)); + sida.completeCacheFlush(ENCODED_REGION_NAME); + long currentSequenceId = sequenceid; + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true); + final Set<byte[]> otherFamily = new HashSet<byte[]>(1); + otherFamily.add(Bytes.toBytes("otherCf")); + sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); + // Should return oldest sequence id in the region. + assertEquals(currentSequenceId, (long)sida.startCacheFlush(ENCODED_REGION_NAME, otherFamily)); + sida.completeCacheFlush(ENCODED_REGION_NAME); + } + + @Test + public void testAreAllLower() { + SequenceIdAccounting sida = new SequenceIdAccounting(); + sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME); + Map<byte[], Long> m = new HashMap<byte[], Long>(); + m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); + assertTrue(sida.areAllLower(m)); + long sequenceid = 1; + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true); + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); + assertTrue(sida.areAllLower(m)); + m.put(ENCODED_REGION_NAME, sequenceid); + assertFalse(sida.areAllLower(m)); + long lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME); + assertEquals("Lowest should be first sequence id inserted", 1, lowest); + m.put(ENCODED_REGION_NAME, lowest); + assertFalse(sida.areAllLower(m)); + // Now make sure above works when flushing. + sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES); + assertFalse(sida.areAllLower(m)); + m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); + assertTrue(sida.areAllLower(m)); + // Let the flush complete and if we ask if the sequenceid is lower, should be yes since no edits + sida.completeCacheFlush(ENCODED_REGION_NAME); + m.put(ENCODED_REGION_NAME, sequenceid); + assertTrue(sida.areAllLower(m)); + // Flush again but add sequenceids while we are flushing. + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); + lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME); + m.put(ENCODED_REGION_NAME, lowest); + assertFalse(sida.areAllLower(m)); + sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES); + // The cache flush will clear out all sequenceid accounting by region. + assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME)); + sida.completeCacheFlush(ENCODED_REGION_NAME); + // No new edits have gone in so no sequenceid to work with. + assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME)); + // Make an edit behind all we'll put now into sida. + m.put(ENCODED_REGION_NAME, sequenceid); + sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); + sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); + sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); + assertTrue(sida.areAllLower(m)); + } + + @Test + public void testFindLower() { + SequenceIdAccounting sida = new SequenceIdAccounting(); + sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME); + Map<byte[], Long> m = new HashMap<byte[], Long>(); + m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); + long sequenceid = 1; + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true); + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); + sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); + assertTrue(sida.findLower(m) == null); + m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME)); + assertTrue(sida.findLower(m).length == 1); + m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME) - 1); + assertTrue(sida.findLower(m) == null); + } +} \ No newline at end of file
