Repository: hbase Updated Branches: refs/heads/master fe50c6d36 -> bbe29eb93
http://git-wip-us.apache.org/repos/asf/hbase/blob/bbe29eb9/hbase-protocol/src/main/protobuf/WAL.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto index 88e94f4..f14d5f4 100644 --- a/hbase-protocol/src/main/protobuf/WAL.proto +++ b/hbase-protocol/src/main/protobuf/WAL.proto @@ -89,13 +89,36 @@ message CompactionDescriptor { required bytes table_name = 1; // TODO: WALKey already stores these, might remove required bytes encoded_region_name = 2; required bytes family_name = 3; - repeated string compaction_input = 4; + repeated string compaction_input = 4; // relative to store dir repeated string compaction_output = 5; - required string store_home_dir = 6; + required string store_home_dir = 6; // relative to region dir optional bytes region_name = 7; // full region name } /** + * Special WAL entry to hold all related to a flush. + */ +message FlushDescriptor { + enum FlushAction { + START_FLUSH = 0; + COMMIT_FLUSH = 1; + ABORT_FLUSH = 2; + } + + message StoreFlushDescriptor { + required bytes family_name = 1; + required string store_home_dir = 2; //relative to region dir + repeated string flush_output = 3; // relative to store dir (if this is a COMMIT_FLUSH) + } + + required FlushAction action = 1; + required bytes table_name = 2; + required bytes encoded_region_name = 3; + optional uint64 flush_sequence_number = 4; + repeated StoreFlushDescriptor store_flushes = 5; +} + +/** * A trailer that is appended to the end of a properly closed HLog WAL file. * If missing, this is either a legacy or a corrupted WAL file. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/bbe29eb9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 80ca848..fbb1ac0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -113,10 +114,13 @@ import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -1729,8 +1733,11 @@ public class HRegion implements HeapSize { // , Writable{ status.setStatus("Preparing to flush by snapshotting stores in " + getRegionInfo().getEncodedName()); List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size()); + TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>( + Bytes.BYTES_COMPARATOR); long flushSeqId = -1L; + long trxId = 0; try { try { w = mvcc.beginMemstoreInsert(); @@ -1754,12 +1761,39 @@ public class HRegion implements HeapSize { // , Writable{ for (Store s : stores.values()) { totalFlushableSize += s.getFlushableSize(); storeFlushCtxs.add(s.createFlushContext(flushSeqId)); + committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL + } + + // write the snapshot start to WAL + if (wal != null) { + FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, + getRegionInfo(), flushSeqId, committedFiles); + trxId = HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), + desc, sequenceId, false); // no sync. Sync is below where we do not hold the updates lock } // Prepare flush (take a snapshot) for (StoreFlushContext flush : storeFlushCtxs) { flush.prepare(); } + } catch (IOException ex) { + if (wal != null) { + if (trxId > 0) { // check whether we have already written START_FLUSH to WAL + try { + FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, + getRegionInfo(), flushSeqId, committedFiles); + HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), + desc, sequenceId, false); + } catch (Throwable t) { + LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + + StringUtils.stringifyException(t)); + // ignore this since we will be aborting the RS with DSE. + } + } + // we have called wal.startCacheFlush(), now we have to abort it + wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); + throw ex; // let upper layers deal with it. + } } finally { this.updatesLock.writeLock().unlock(); } @@ -1767,9 +1801,16 @@ public class HRegion implements HeapSize { // , Writable{ ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize; status.setStatus(s); if (LOG.isTraceEnabled()) LOG.trace(s); - // sync unflushed WAL changes when deferred log sync is enabled + // sync unflushed WAL changes // see HBASE-8208 for details - if (wal != null && !shouldSyncLog()) wal.sync(); + if (wal != null) { + try { + wal.sync(); // ensure that flush marker is sync'ed + } catch (IOException ioe) { + LOG.warn("Unexpected exception while log.sync(), ignoring. Exception: " + + StringUtils.stringifyException(ioe)); + } + } // wait for all in-progress transactions to commit to HLog before // we can start the flush. This prevents @@ -1806,16 +1847,27 @@ public class HRegion implements HeapSize { // , Writable{ // Switch snapshot (in memstore) -> new hfile (thus causing // all the store scanners to reset/reseek). + Iterator<Store> it = stores.values().iterator(); // stores.values() and storeFlushCtxs have + // same order for (StoreFlushContext flush : storeFlushCtxs) { boolean needsCompaction = flush.commit(status); if (needsCompaction) { compactionRequested = true; } + committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles()); } storeFlushCtxs.clear(); // Set down the memstore size by amount of flush. this.addAndGetGlobalMemstoreSize(-totalFlushableSize); + + if (wal != null) { + // write flush marker to WAL. If fail, we should throw DroppedSnapshotException + FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, + getRegionInfo(), flushSeqId, committedFiles); + HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), + desc, sequenceId, true); + } } catch (Throwable t) { // An exception here means that the snapshot was not persisted. // The hlog needs to be replayed so its content is restored to memstore. @@ -1824,6 +1876,16 @@ public class HRegion implements HeapSize { // , Writable{ // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch // all and sundry. if (wal != null) { + try { + FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, + getRegionInfo(), flushSeqId, committedFiles); + HLogUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), + desc, sequenceId, false); + } catch (Throwable ex) { + LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + + StringUtils.stringifyException(ex)); + // ignore this since we will be aborting the RS with DSE. + } wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); } DroppedSnapshotException dse = new DroppedSnapshotException("region: " + http://git-wip-us.apache.org/repos/asf/hbase/blob/bbe29eb9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index e059fe8..3f5729a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2035,6 +2035,7 @@ public class HStore implements Store { private long cacheFlushSeqNum; private MemStoreSnapshot snapshot; private List<Path> tempFiles; + private List<Path> committedFiles; private StoreFlusherImpl(long cacheFlushSeqNum) { this.cacheFlushSeqNum = cacheFlushSeqNum; @@ -2047,6 +2048,7 @@ public class HStore implements Store { @Override public void prepare() { this.snapshot = memstore.snapshot(); + committedFiles = new ArrayList<Path>(1); } @Override @@ -2079,14 +2081,20 @@ public class HStore implements Store { } } - if (HStore.this.getCoprocessorHost() != null) { - for (StoreFile sf : storeFiles) { + for (StoreFile sf : storeFiles) { + if (HStore.this.getCoprocessorHost() != null) { HStore.this.getCoprocessorHost().postFlush(HStore.this, sf); } + committedFiles.add(sf.getPath()); } // Add new file to store files. Clear snapshot too while we have the Store write lock. return HStore.this.updateStorefiles(storeFiles, snapshot.getId()); } + + @Override + public List<Path> getCommittedFiles() { + return committedFiles; + } } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/bbe29eb9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java index 193a811..fdf1f1e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.monitoring.MonitoredTask; /** @@ -61,4 +63,10 @@ interface StoreFlushContext { * @throws IOException */ boolean commit(MonitoredTask status) throws IOException; + + /** + * Returns the newly committed files from the flush. Called only if commit returns true + * @return a list of Paths for new files + */ + List<Path> getCommittedFiles(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/bbe29eb9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index a0707f7..2c4652b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.util.FSUtils; import com.google.protobuf.TextFormat; @@ -268,4 +269,19 @@ public class HLogUtil { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); } } + + /** + * Write a flush marker indicating a start / abort or a complete of a region flush + */ + public static long writeFlushMarker(HLog log, HTableDescriptor htd, HRegionInfo info, + final FlushDescriptor f, AtomicLong sequenceId, boolean sync) throws IOException { + TableName tn = TableName.valueOf(f.getTableName().toByteArray()); + HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn); + long trx = log.appendNoSync(htd, info, key, WALEdit.createFlushWALEdit(info, f), sequenceId, false, null); + if (sync) log.sync(trx); + if (LOG.isTraceEnabled()) { + LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); + } + return trx; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/bbe29eb9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index 24d9d6d..f684d7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -36,8 +36,10 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.io.Writable; @@ -83,6 +85,8 @@ public class WALEdit implements Writable, HeapSize { public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY"); static final byte [] METAROW = Bytes.toBytes("METAROW"); static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION"); + static final byte [] FLUSH = Bytes.toBytes("HBASE::FLUSH"); + private final int VERSION_2 = -1; private final boolean isReplay; @@ -112,6 +116,10 @@ public class WALEdit implements Writable, HeapSize { return Bytes.equals(METAFAMILY, f); } + public static boolean isMetaEditFamily(Cell cell) { + return CellUtil.matchingFamily(cell, METAFAMILY); + } + /** * @return True when current WALEdit is created by log replay. Replication skips WALEdits from * replay. @@ -256,6 +264,19 @@ public class WALEdit implements Writable, HeapSize { return sb.toString(); } + public static WALEdit createFlushWALEdit(HRegionInfo hri, FlushDescriptor f) { + KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, FLUSH, + EnvironmentEdgeManager.currentTimeMillis(), f.toByteArray()); + return new WALEdit().add(kv); + } + + public static FlushDescriptor getFlushDescriptor(Cell cell) throws IOException { + if (CellUtil.matchingColumn(cell, METAFAMILY, FLUSH)) { + return FlushDescriptor.parseFrom(cell.getValue()); + } + return null; + } + /** * Create a compacion WALEdit * @param c @@ -264,7 +285,7 @@ public class WALEdit implements Writable, HeapSize { public static WALEdit createCompaction(final HRegionInfo hri, final CompactionDescriptor c) { byte [] pbbytes = c.toByteArray(); KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, COMPACTION, - System.currentTimeMillis(), pbbytes); + EnvironmentEdgeManager.currentTimeMillis(), pbbytes); return new WALEdit().add(kv); //replication scope null so that this won't be replicated } http://git-wip-us.apache.org/repos/asf/hbase/blob/bbe29eb9/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 15e530a..6e050a0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -35,10 +35,12 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.io.InterruptedIOException; @@ -111,6 +113,9 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.HRegion.RowLock; import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem; @@ -136,6 +141,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.mockito.ArgumentMatcher; import org.mockito.Mockito; import com.google.common.collect.Lists; @@ -787,6 +793,228 @@ public class TestHRegion { } @Test + public void testFlushMarkers() throws Exception { + // tests that flush markers are written to WAL and handled at recovered edits + String method = name.getMethodName(); + TableName tableName = TableName.valueOf(method); + byte[] family = Bytes.toBytes("family"); + Path logDir = TEST_UTIL.getDataTestDirOnTestFS("testRecoveredEditsIgnoreFlushMarkers.log"); + HLog hlog = HLogFactory.createHLog(FILESYSTEM, logDir, UUID.randomUUID().toString(), + TEST_UTIL.getConfiguration()); + + this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, hlog, family); + try { + Path regiondir = region.getRegionFileSystem().getRegionDir(); + FileSystem fs = region.getRegionFileSystem().getFileSystem(); + byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); + + long maxSeqId = 3; + long minSeqId = 0; + + for (long i = minSeqId; i < maxSeqId; i++) { + Put put = new Put(Bytes.toBytes(i)); + put.add(family, Bytes.toBytes(i), Bytes.toBytes(i)); + region.put(put); + region.flushcache(); + } + + // this will create a region with 3 files from flush + assertEquals(3, region.getStore(family).getStorefilesCount()); + List<String> storeFiles = new ArrayList<String>(3); + for (StoreFile sf : region.getStore(family).getStorefiles()) { + storeFiles.add(sf.getPath().getName()); + } + + // now verify that the flush markers are written + hlog.close(); + HLog.Reader reader = HLogFactory.createReader(fs, + fs.listStatus(fs.listStatus(logDir)[0].getPath())[0].getPath(), + TEST_UTIL.getConfiguration()); + + List<HLog.Entry> flushDescriptors = new ArrayList<HLog.Entry>(); + long lastFlushSeqId = -1; + while (true) { + HLog.Entry entry = reader.next(); + if (entry == null) { + break; + } + Cell cell = entry.getEdit().getKeyValues().get(0); + if (WALEdit.isMetaEditFamily(cell)) { + FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell); + assertNotNull(flushDesc); + assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray()); + if (flushDesc.getAction() == FlushAction.START_FLUSH) { + assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId); + } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) { + assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId); + } + lastFlushSeqId = flushDesc.getFlushSequenceNumber(); + assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray()); + assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store + StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0); + assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray()); + assertEquals("family", storeFlushDesc.getStoreHomeDir()); + if (flushDesc.getAction() == FlushAction.START_FLUSH) { + assertEquals(0, storeFlushDesc.getFlushOutputCount()); + } else { + assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush + assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0))); + } + + flushDescriptors.add(entry); + } + } + + assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush + + // now write those markers to the recovered edits again. + + Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir); + + Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); + fs.create(recoveredEdits); + HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF); + + for (HLog.Entry entry : flushDescriptors) { + writer.append(entry); + } + writer.close(); + + // close the region now, and reopen again + region.close(); + region = HRegion.openHRegion(region, null); + + // now check whether we have can read back the data from region + for (long i = minSeqId; i < maxSeqId; i++) { + Get get = new Get(Bytes.toBytes(i)); + Result result = region.get(get); + byte[] value = result.getValue(family, Bytes.toBytes(i)); + assertArrayEquals(Bytes.toBytes(i), value); + } + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + class IsFlushWALMarker extends ArgumentMatcher<WALEdit> { + volatile FlushAction[] actions; + public IsFlushWALMarker(FlushAction... actions) { + this.actions = actions; + } + @Override + public boolean matches(Object edit) { + List<KeyValue> kvs = ((WALEdit)edit).getKeyValues(); + if (kvs.isEmpty()) { + return false; + } + if (WALEdit.isMetaEditFamily(kvs.get(0))) { + FlushDescriptor desc = null; + try { + desc = WALEdit.getFlushDescriptor(kvs.get(0)); + } catch (IOException e) { + LOG.warn(e); + return false; + } + if (desc != null) { + for (FlushAction action : actions) { + if (desc.getAction() == action) { + return true; + } + } + } + } + return false; + } + public IsFlushWALMarker set(FlushAction... actions) { + this.actions = actions; + return this; + } + } + + @Test + @SuppressWarnings("unchecked") + public void testFlushMarkersWALFail() throws Exception { + // test the cases where the WAL append for flush markers fail. + String method = name.getMethodName(); + TableName tableName = TableName.valueOf(method); + byte[] family = Bytes.toBytes("family"); + + // spy an actual WAL implementation to throw exception (was not able to mock) + Path logDir = TEST_UTIL.getDataTestDirOnTestFS("testRecoveredEditsIgnoreFlushMarkers.log"); + HLog hlog = spy(HLogFactory.createHLog(FILESYSTEM, logDir, UUID.randomUUID().toString(), + TEST_UTIL.getConfiguration())); + + this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, hlog, family); + try { + int i = 0; + Put put = new Put(Bytes.toBytes(i)); + put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal + put.add(family, Bytes.toBytes(i), Bytes.toBytes(i)); + region.put(put); + + // 1. Test case where START_FLUSH throws exception + IsFlushWALMarker isFlushWALMarker = new IsFlushWALMarker(FlushAction.START_FLUSH); + + // throw exceptions if the WalEdit is a start flush action + when(hlog.appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(), + (WALEdit)argThat(isFlushWALMarker), (AtomicLong)any(), Mockito.anyBoolean(), + (List<KeyValue>)any())) + .thenThrow(new IOException("Fail to append flush marker")); + + // start cache flush will throw exception + try { + region.flushcache(); + fail("This should have thrown exception"); + } catch (DroppedSnapshotException unexpected) { + // this should not be a dropped snapshot exception. Meaning that RS will not abort + throw unexpected; + } catch (IOException expected) { + // expected + } + + // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception + isFlushWALMarker.set(FlushAction.COMMIT_FLUSH); + + try { + region.flushcache(); + fail("This should have thrown exception"); + } catch (DroppedSnapshotException expected) { + // we expect this exception, since we were able to write the snapshot, but failed to + // write the flush marker to WAL + } catch (IOException unexpected) { + throw unexpected; + } + + region.close(); + this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, hlog, family); + region.put(put); + + // 3. Test case where ABORT_FLUSH will throw exception. + // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with + // DroppedSnapshotException. Below COMMMIT_FLUSH will cause flush to abort + isFlushWALMarker.set(FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH); + + try { + region.flushcache(); + fail("This should have thrown exception"); + } catch (DroppedSnapshotException expected) { + // we expect this exception, since we were able to write the snapshot, but failed to + // write the flush marker to WAL + } catch (IOException unexpected) { + throw unexpected; + } + + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } + } + + @Test public void testGetWhileRegionClose() throws IOException { TableName tableName = TableName.valueOf(name.getMethodName()); Configuration hc = initSplit();