Repository: hbase Updated Branches: refs/heads/master 7cc458e12 -> d09200876
HBASE-18221 Switch from pread to stream should happen under HStore's reentrant lock (Ram) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d0920087 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d0920087 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d0920087 Branch: refs/heads/master Commit: d092008766c460de329d14d40e9cfd2377dcaf01 Parents: 7cc458e Author: Ramkrishna <ramkrishna.s.vasude...@intel.com> Authored: Fri Jun 23 10:32:29 2017 +0530 Committer: Ramkrishna <ramkrishna.s.vasude...@intel.com> Committed: Fri Jun 23 10:32:29 2017 +0530 ---------------------------------------------------------------------- .../regionserver/DefaultStoreFileManager.java | 16 +- .../hadoop/hbase/regionserver/HStore.java | 46 ++++++ .../apache/hadoop/hbase/regionserver/Store.java | 28 ++++ .../hbase/regionserver/StoreFileManager.java | 6 + .../hadoop/hbase/regionserver/StoreScanner.java | 26 ++-- .../regionserver/StripeStoreFileManager.java | 5 + .../hadoop/hbase/regionserver/TestStore.java | 146 ++++++++++++++++--- 7 files changed, 235 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d0920087/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index f4f9aa6..915d62f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -27,10 +27,6 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; -import com.google.common.collect.ImmutableCollection; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -40,6 +36,10 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + /** * Default implementation of StoreFileManager. Not thread-safe. */ @@ -117,6 +117,14 @@ class DefaultStoreFileManager implements StoreFileManager { } @Override + public final int getCompactedFilesCount() { + if (compactedfiles == null) { + return 0; + } + return compactedfiles.size(); + } + + @Override public void addCompactionResults( Collection<StoreFile> newCompactedfiles, Collection<StoreFile> results) { ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles); http://git-wip-us.apache.org/repos/asf/hbase/blob/d0920087/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index ed0f201..9ab52c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NavigableSet; import java.util.Set; import java.util.concurrent.Callable; @@ -727,6 +728,11 @@ public class HStore implements Store { return this.storeEngine.getStoreFileManager().getStorefiles(); } + @Override + public Collection<StoreFile> getCompactedFiles() { + return this.storeEngine.getStoreFileManager().getCompactedfiles(); + } + /** * This throws a WrongRegionException if the HFile does not fit in this region, or an * InvalidHFileException if the HFile is not valid. @@ -1928,6 +1934,41 @@ public class HStore implements Store { } @Override + public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners, + boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, + byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, + boolean includeMemstoreScanner) throws IOException { + this.lock.readLock().lock(); + try { + Map<String, StoreFile> name2File = + new HashMap<>(getStorefilesCount() + getCompactedFilesCount()); + for (StoreFile file : getStorefiles()) { + name2File.put(file.getFileInfo().getActiveFileName(), file); + } + if (getCompactedFiles() != null) { + for (StoreFile file : getCompactedFiles()) { + name2File.put(file.getFileInfo().getActiveFileName(), file); + } + } + List<StoreFile> filesToReopen = new ArrayList<>(); + for (KeyValueScanner kvs : currentFileScanners) { + assert kvs.isFileScanner(); + if (kvs.peek() == null) { + continue; + } + filesToReopen.add(name2File.get(kvs.getFilePath().getName())); + } + if (filesToReopen.isEmpty()) { + return null; + } + return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, + includeStartRow, stopRow, includeStopRow, readPt, false); + } finally { + this.lock.readLock().unlock(); + } + } + + @Override public String toString() { return this.getColumnFamilyName(); } @@ -1938,6 +1979,11 @@ public class HStore implements Store { } @Override + public int getCompactedFilesCount() { + return this.storeEngine.getStoreFileManager().getCompactedFilesCount(); + } + + @Override public long getMaxStoreFileAge() { long earliestTS = Long.MAX_VALUE; for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/d0920087/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index f5e90eb..c0df66a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -61,6 +61,8 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf Collection<StoreFile> getStorefiles(); + Collection<StoreFile> getCompactedFiles(); + /** * Close all the readers We don't need to worry about subsequent requests because the Region * holds a write lock that will prevent any more reads or writes. @@ -116,6 +118,27 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf boolean includeStopRow, long readPt) throws IOException; /** + * Recreates the scanners on the current list of active store file scanners + * @param currentFileScanners the current set of active store file scanners + * @param cacheBlocks cache the blocks or not + * @param usePread use pread or not + * @param isCompaction is the scanner for compaction + * @param matcher the scan query matcher + * @param startRow the scan's start row + * @param includeStartRow should the scan include the start row + * @param stopRow the scan's stop row + * @param includeStopRow should the scan include the stop row + * @param readPt the read point of the current scane + * @param includeMemstoreScanner whether the current scanner should include memstorescanner + * @return list of scanners recreated on the current Scanners + * @throws IOException + */ + List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners, + boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, + byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, + boolean includeMemstoreScanner) throws IOException; + + /** * Create scanners on the given files and if needed on the memstore with no filtering based on TTL * (that happens further down the line). * @param files the list of files on which the scanners has to be created @@ -367,6 +390,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf int getStorefilesCount(); /** + * @return Count of compacted store files + */ + int getCompactedFilesCount(); + + /** * @return Max age of store files in this store */ long getMaxStoreFileAge(); http://git-wip-us.apache.org/repos/asf/hbase/blob/d0920087/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java index 933849c..eb760ed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java @@ -104,6 +104,12 @@ public interface StoreFileManager { int getStorefileCount(); /** + * Returns the number of compacted files. + * @return The number of files. + */ + int getCompactedFilesCount(); + + /** * Gets the store files to scan for a Scan or Get request. * @param startRow Start row of the request. * @param stopRow Stop row of the request. http://git-wip-us.apache.org/repos/asf/hbase/blob/d0920087/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 9849c93..11301d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -966,7 +966,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return heap.reseek(kv); } - private void trySwitchToStreamRead() { + @VisibleForTesting + void trySwitchToStreamRead() { if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null || bytesRead < preadMaxBytes) { return; @@ -977,34 +978,27 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } scanUsePread = false; Cell lastTop = heap.peek(); - Map<String, StoreFile> name2File = new HashMap<>(store.getStorefilesCount()); - for (StoreFile file : store.getStorefiles()) { - name2File.put(file.getFileInfo().getActiveFileName(), file); - } - List<StoreFile> filesToReopen = new ArrayList<>(); List<KeyValueScanner> memstoreScanners = new ArrayList<>(); List<KeyValueScanner> scannersToClose = new ArrayList<>(); for (KeyValueScanner kvs : currentScanners) { if (!kvs.isFileScanner()) { + // collect memstorescanners here memstoreScanners.add(kvs); } else { scannersToClose.add(kvs); - if (kvs.peek() == null) { - continue; - } - filesToReopen.add(name2File.get(kvs.getFilePath().getName())); } } - if (filesToReopen.isEmpty()) { - return; - } List<KeyValueScanner> fileScanners = null; List<KeyValueScanner> newCurrentScanners; KeyValueHeap newHeap; try { - fileScanners = - store.getScanners(filesToReopen, cacheBlocks, false, false, matcher, scan.getStartRow(), - scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), readPt, false); + // recreate the scanners on the current file scanners + fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, + matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), + scan.includeStopRow(), readPt, false); + if (fileScanners == null) { + return; + } seekScanners(fileScanners, lastTop, false, parallelSeekEnabled); newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size()); newCurrentScanners.addAll(fileScanners); http://git-wip-us.apache.org/repos/asf/hbase/blob/d0920087/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java index 3c7469e..18a6eec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java @@ -147,6 +147,11 @@ public class StripeStoreFileManager } @Override + public int getCompactedFilesCount() { + return state.allCompactedFilesCached.size(); + } + + @Override public void insertNewFiles(Collection<StoreFile> sfs) throws IOException { CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true); // Passing null does not cause NPE?? http://git-wip-us.apache.org/repos/asf/hbase/blob/d0920087/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 22539c5..2318414 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -45,6 +46,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import org.apache.commons.logging.Log; @@ -104,7 +106,6 @@ import org.junit.rules.TestName; import org.mockito.Mockito; import com.google.common.collect.Lists; -import java.util.concurrent.atomic.AtomicInteger; /** * Test class for the Store @@ -161,19 +162,19 @@ public class TestStore { init(methodName, TEST_UTIL.getConfiguration()); } - private void init(String methodName, Configuration conf) + private Store init(String methodName, Configuration conf) throws IOException { HColumnDescriptor hcd = new HColumnDescriptor(family); // some of the tests write 4 versions and then flush // (with HBASE-4241, lower versions are collected on flush) hcd.setMaxVersions(4); - init(methodName, conf, hcd); + return init(methodName, conf, hcd); } - private void init(String methodName, Configuration conf, + private Store init(String methodName, Configuration conf, HColumnDescriptor hcd) throws IOException { HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); - init(methodName, conf, htd, hcd); + return init(methodName, conf, htd, hcd); } private Store init(String methodName, Configuration conf, HTableDescriptor htd, @@ -184,6 +185,11 @@ public class TestStore { @SuppressWarnings("deprecation") private Store init(String methodName, Configuration conf, HTableDescriptor htd, HColumnDescriptor hcd, MyScannerHook hook) throws IOException { + return init(methodName, conf, htd, hcd, hook, false); + } + @SuppressWarnings("deprecation") + private Store init(String methodName, Configuration conf, HTableDescriptor htd, + HColumnDescriptor hcd, MyScannerHook hook, boolean switchToPread) throws IOException { //Setting up a Store Path basedir = new Path(DIR+methodName); Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); @@ -198,7 +204,8 @@ public class TestStore { } else { htd.addFamily(hcd); } - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, + MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null); HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, basedir); @@ -208,7 +215,7 @@ public class TestStore { if (hook == null) { store = new HStore(region, hcd, conf); } else { - store = new MyStore(region, hcd, conf, hook); + store = new MyStore(region, hcd, conf, hook, switchToPread); } return store; } @@ -833,9 +840,10 @@ public class TestStore { public static class DummyStoreEngine extends DefaultStoreEngine { public static DefaultCompactor lastCreatedCompactor = null; + @Override - protected void createComponents( - Configuration conf, Store store, CellComparator comparator) throws IOException { + protected void createComponents(Configuration conf, Store store, CellComparator comparator) + throws IOException { super.createComponents(conf, store, comparator); lastCreatedCompactor = this.compactor; } @@ -1039,6 +1047,13 @@ public class TestStore { return c; } + private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[] value) + throws IOException { + Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value); + CellUtil.setSequenceId(c, sequenceId); + return c; + } + @Test public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException { Configuration conf = HBaseConfiguration.create(); @@ -1269,35 +1284,130 @@ public class TestStore { storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); } - private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException { + private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) + throws IOException { HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); HColumnDescriptor hcd = new HColumnDescriptor(family); hcd.setMaxVersions(5); return (MyStore) init(methodName, conf, htd, hcd, hook); } - private static class MyStore extends HStore { + class MyStore extends HStore { private final MyScannerHook hook; - MyStore(final HRegion region, final HColumnDescriptor family, - final Configuration confParam, MyScannerHook hook) throws IOException { + + MyStore(final HRegion region, final HColumnDescriptor family, final Configuration confParam, + MyScannerHook hook, boolean switchToPread) throws IOException { super(region, family, confParam); this.hook = hook; } @Override public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks, - boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, - boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, - boolean includeMemstoreScanner) throws IOException { + boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, + boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, + boolean includeMemstoreScanner) throws IOException { hook.hook(this); - return super.getScanners(files, cacheBlocks, usePread, - isCompaction, matcher, startRow, true, stopRow, false, readPt, includeMemstoreScanner); + return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, + stopRow, false, readPt, includeMemstoreScanner); } } private interface MyScannerHook { void hook(MyStore store) throws IOException; } + @Test + public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception { + int flushSize = 500; + Configuration conf = HBaseConfiguration.create(); + conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName()); + conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0); + // Set the lower threshold to invoke the "MERGE" policy + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); + MyStore store = initMyStore(name.getMethodName(), conf, new MyScannerHook() { + @Override + public void hook(org.apache.hadoop.hbase.regionserver.TestStore.MyStore store) + throws IOException { + } + }); + MemstoreSize memStoreSize = new MemstoreSize(); + long ts = System.currentTimeMillis(); + long seqID = 1l; + // Add some data to the region and do some flushes + for (int i = 1; i < 10; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + for (int i = 11; i < 20; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + for (int i = 21; i < 30; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + + assertEquals(3, store.getStorefilesCount()); + ScanInfo scanInfo = store.getScanInfo(); + Scan scan = new Scan(); + scan.addFamily(family); + Collection<StoreFile> storefiles2 = store.getStorefiles(); + ArrayList<StoreFile> actualStorefiles = Lists.newArrayList(storefiles2); + StoreScanner storeScanner = + (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE); + // get the current heap + KeyValueHeap heap = storeScanner.heap; + // create more store files + for (int i = 31; i < 40; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + + for (int i = 41; i < 50; i++) { + store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")), + memStoreSize); + } + // flush them + flushStore(store, seqID); + storefiles2 = store.getStorefiles(); + ArrayList<StoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2); + actualStorefiles1.removeAll(actualStorefiles); + // Do compaction + List<Exception> exceptions = new ArrayList<Exception>(); + MyThread thread = new MyThread(storeScanner); + thread.start(); + store.replaceStoreFiles(actualStorefiles, actualStorefiles1); + thread.join(); + KeyValueHeap heap2 = thread.getHeap(); + assertFalse(heap.equals(heap2)); + } + + private static class MyThread extends Thread { + private StoreScanner scanner; + private KeyValueHeap heap; + + public MyThread(StoreScanner scanner) { + this.scanner = scanner; + } + + public KeyValueHeap getHeap() { + return this.heap; + } + + public void run() { + scanner.trySwitchToStreamRead(); + heap = scanner.heap; + } + } + private static class MyMemStoreCompactor extends MemStoreCompactor { private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0); private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);