HBASE-17914 Create a new reader instead of cloning a new StoreFile when compaction
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/66b616d7 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/66b616d7 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/66b616d7 Branch: refs/heads/hbase-12439 Commit: 66b616d7a3d6f4ad6d20962e2dfc0c82a4092ddb Parents: 719a30b Author: zhangduo <zhang...@apache.org> Authored: Mon Apr 17 22:53:49 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Wed Apr 19 09:26:33 2017 +0800 ---------------------------------------------------------------------- .../hbase/io/FSDataInputStreamWrapper.java | 63 +++--- .../org/apache/hadoop/hbase/io/FileLink.java | 14 +- .../hadoop/hbase/io/HalfStoreFileReader.java | 13 +- .../hadoop/hbase/io/hfile/CacheConfig.java | 9 +- .../org/apache/hadoop/hbase/io/hfile/HFile.java | 85 ++++---- .../hbase/io/hfile/HFilePrettyPrinter.java | 2 +- .../hadoop/hbase/io/hfile/HFileReaderImpl.java | 26 +-- .../hbase/mapreduce/LoadIncrementalHFiles.java | 45 ++-- .../procedure/MergeTableRegionsProcedure.java | 9 +- .../procedure/SplitTableRegionProcedure.java | 8 +- .../apache/hadoop/hbase/mob/CachedMobFile.java | 4 +- .../org/apache/hadoop/hbase/mob/MobFile.java | 8 +- .../org/apache/hadoop/hbase/mob/MobUtils.java | 13 +- .../compactions/PartitionedMobCompactor.java | 26 +-- .../regionserver/DefaultStoreFileManager.java | 2 +- .../hadoop/hbase/regionserver/HMobStore.java | 6 +- .../hadoop/hbase/regionserver/HRegion.java | 4 +- .../hbase/regionserver/HRegionFileSystem.java | 6 +- .../hadoop/hbase/regionserver/HStore.java | 19 +- .../regionserver/ReversedStoreScanner.java | 2 +- .../hadoop/hbase/regionserver/StoreFile.java | 216 ++++++++++++------- .../hbase/regionserver/StoreFileInfo.java | 21 +- .../hbase/regionserver/StoreFileReader.java | 86 ++++---- .../hbase/regionserver/StoreFileScanner.java | 50 +++-- .../hadoop/hbase/regionserver/StoreScanner.java | 6 +- .../regionserver/compactions/Compactor.java | 44 +--- .../hadoop/hbase/util/CompressionTest.java | 2 +- .../org/apache/hadoop/hbase/util/HBaseFsck.java | 6 +- .../hbase/util/hbck/HFileCorruptionChecker.java | 4 +- .../hbase/HFilePerformanceEvaluation.java | 2 +- .../hadoop/hbase/client/TestFromClientSide.java | 1 + .../hbase/io/TestHalfStoreFileReader.java | 192 ++++++++--------- .../hadoop/hbase/io/hfile/TestCacheOnWrite.java | 2 +- .../apache/hadoop/hbase/io/hfile/TestHFile.java | 8 +- .../hbase/io/hfile/TestHFileBlockIndex.java | 6 +- .../hbase/io/hfile/TestHFileEncryption.java | 6 +- .../TestHFileInlineToRootChunkConversion.java | 2 +- .../hadoop/hbase/io/hfile/TestPrefetch.java | 2 +- .../hadoop/hbase/io/hfile/TestReseekTo.java | 4 +- .../hfile/TestSeekBeforeWithInlineBlocks.java | 2 +- .../hadoop/hbase/io/hfile/TestSeekTo.java | 8 +- .../hbase/mapreduce/TestHFileOutputFormat2.java | 10 +- .../TestImportTSVWithVisibilityLabels.java | 2 +- .../hadoop/hbase/mapreduce/TestImportTsv.java | 2 +- .../mapreduce/TestLoadIncrementalHFiles.java | 4 +- .../apache/hadoop/hbase/mob/TestMobFile.java | 8 +- .../hbase/mob/compactions/TestMobCompactor.java | 9 +- .../TestPartitionedMobCompactor.java | 18 +- .../regionserver/DataBlockEncodingTool.java | 7 +- .../EncodedSeekPerformanceTest.java | 12 +- .../hbase/regionserver/MockStoreFile.java | 25 ++- .../regionserver/TestCacheOnWriteInSchema.java | 6 +- .../regionserver/TestCompactionPolicy.java | 3 - .../regionserver/TestCompoundBloomFilter.java | 7 +- .../regionserver/TestEncryptionKeyRotation.java | 2 +- .../TestEncryptionRandomKeying.java | 2 +- .../hbase/regionserver/TestFSErrorsExposed.java | 12 +- .../regionserver/TestMobStoreCompaction.java | 7 +- .../regionserver/TestReversibleScanners.java | 33 ++- .../hadoop/hbase/regionserver/TestStore.java | 2 +- .../hbase/regionserver/TestStoreFile.java | 120 ++++++----- .../TestStoreFileScannerWithTagCompression.java | 10 +- .../regionserver/compactions/TestCompactor.java | 3 - .../compactions/TestStripeCompactionPolicy.java | 3 - .../hbase/util/TestHBaseFsckEncryption.java | 2 +- .../hadoop/hbase/spark/BulkLoadSuite.scala | 8 +- 66 files changed, 701 insertions(+), 650 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java index b06be6b..055e46a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java @@ -17,11 +17,14 @@ */ package org.apache.hadoop.hbase.io; +import java.io.Closeable; import java.io.IOException; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.fs.HFileSystem; import com.google.common.annotations.VisibleForTesting; @@ -31,11 +34,14 @@ import com.google.common.annotations.VisibleForTesting; * as well as closing streams. Initialization is not thread-safe, but normal operation is; * see method comments. */ -public class FSDataInputStreamWrapper { +@InterfaceAudience.Private +public class FSDataInputStreamWrapper implements Closeable { private final HFileSystem hfs; private final Path path; private final FileLink link; private final boolean doCloseStreams; + private final boolean dropBehind; + private final long readahead; /** Two stream handles, one with and one without FS-level checksum. * HDFS checksum setting is on FS level, not single read level, so you have to keep two @@ -75,43 +81,52 @@ public class FSDataInputStreamWrapper { private volatile int hbaseChecksumOffCount = -1; public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { - this(fs, null, path, false); + this(fs, path, false, -1L); } - public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind) throws IOException { - this(fs, null, path, dropBehind); + public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind, long readahead) throws IOException { + this(fs, null, path, dropBehind, readahead); } - public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException { - this(fs, link, null, false); - } public FSDataInputStreamWrapper(FileSystem fs, FileLink link, - boolean dropBehind) throws IOException { - this(fs, link, null, dropBehind); + boolean dropBehind, long readahead) throws IOException { + this(fs, link, null, dropBehind, readahead); } - private FSDataInputStreamWrapper(FileSystem fs, FileLink link, - Path path, boolean dropBehind) throws IOException { + private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolean dropBehind, + long readahead) throws IOException { assert (path == null) != (link == null); this.path = path; this.link = link; this.doCloseStreams = true; + this.dropBehind = dropBehind; + this.readahead = readahead; // If the fs is not an instance of HFileSystem, then create an instance of HFileSystem // that wraps over the specified fs. In this case, we will not be able to avoid // checksumming inside the filesystem. - this.hfs = (fs instanceof HFileSystem) ? (HFileSystem)fs : new HFileSystem(fs); + this.hfs = (fs instanceof HFileSystem) ? (HFileSystem) fs : new HFileSystem(fs); // Initially we are going to read the tail block. Open the reader w/FS checksum. this.useHBaseChecksumConfigured = this.useHBaseChecksum = false; this.stream = (link != null) ? link.open(hfs) : hfs.open(path); + setStreamOptions(stream); + } + + private void setStreamOptions(FSDataInputStream in) { try { this.stream.setDropBehind(dropBehind); } catch (Exception e) { // Skipped. } + if (readahead >= 0) { + try { + this.stream.setReadahead(readahead); + } catch (Exception e) { + // Skipped. + } + } } - /** * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any * reads finish and before any other reads start (what happens in reality is we read the @@ -127,6 +142,7 @@ public class FSDataInputStreamWrapper { if (useHBaseChecksum) { FileSystem fsNc = hfs.getNoChecksumFs(); this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path); + setStreamOptions(streamNoFsChecksum); this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum; // Close the checksum stream; we will reopen it if we get an HBase checksum failure. this.stream.close(); @@ -150,6 +166,8 @@ public class FSDataInputStreamWrapper { link = null; hfs = null; useHBaseChecksumConfigured = useHBaseChecksum = false; + dropBehind = false; + readahead = 0; } /** @@ -201,19 +219,14 @@ public class FSDataInputStreamWrapper { } /** Close stream(s) if necessary. */ - public void close() throws IOException { - if (!doCloseStreams) return; - try { - if (stream != streamNoFsChecksum && streamNoFsChecksum != null) { - streamNoFsChecksum.close(); - streamNoFsChecksum = null; - } - } finally { - if (stream != null) { - stream.close(); - stream = null; - } + @Override + public void close() { + if (!doCloseStreams) { + return; } + // we do not care about the close exception as it is for reading, no data loss issue. + IOUtils.closeQuietly(streamNoFsChecksum); + IOUtils.closeQuietly(stream); } public HFileSystem getHfs() { http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java index ca0dfbc..8a79efb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java @@ -29,6 +29,8 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.fs.CanSetDropBehind; +import org.apache.hadoop.fs.CanSetReadahead; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; @@ -99,7 +101,7 @@ public class FileLink { * and the alternative locations, when the file is moved. */ private static class FileLinkInputStream extends InputStream - implements Seekable, PositionedReadable { + implements Seekable, PositionedReadable, CanSetDropBehind, CanSetReadahead { private FSDataInputStream in = null; private Path currentPath = null; private long pos = 0; @@ -306,6 +308,16 @@ public class FileLink { } throw new FileNotFoundException("Unable to open link: " + fileLink); } + + @Override + public void setReadahead(Long readahead) throws IOException, UnsupportedOperationException { + in.setReadahead(readahead); + } + + @Override + public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException { + in.setDropBehind(dropCache); + } } private Path[] locations = null; http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index a4a281e..c4dbc39 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -72,10 +73,10 @@ public class HalfStoreFileReader extends StoreFileReader { * @param conf Configuration * @throws IOException */ - public HalfStoreFileReader(final FileSystem fs, final Path p, - final CacheConfig cacheConf, final Reference r, final Configuration conf) + public HalfStoreFileReader(FileSystem fs, Path p, CacheConfig cacheConf, Reference r, + boolean isPrimaryReplicaStoreFile, AtomicInteger refCount, boolean shared, Configuration conf) throws IOException { - super(fs, p, cacheConf, conf); + super(fs, p, cacheConf, isPrimaryReplicaStoreFile, refCount, shared, conf); // This is not actual midkey for this half-file; its just border // around which we split top and bottom. Have to look in files to find // actual last and first keys for bottom and top halves. Half-files don't @@ -99,9 +100,9 @@ public class HalfStoreFileReader extends StoreFileReader { * @throws IOException */ public HalfStoreFileReader(final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, - long size, final CacheConfig cacheConf, final Reference r, final Configuration conf) - throws IOException { - super(fs, p, in, size, cacheConf, conf); + long size, final CacheConfig cacheConf, final Reference r, boolean isPrimaryReplicaStoreFile, + AtomicInteger refCount, boolean shared, final Configuration conf) throws IOException { + super(fs, p, in, size, cacheConf, isPrimaryReplicaStoreFile, refCount, shared, conf); // This is not actual midkey for this half-file; its just border // around which we split top and bottom. Have to look in files to find // actual last and first keys for bottom and top halves. Half-files don't http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 4db60b5..791445b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -283,11 +283,10 @@ public class CacheConfig { } /** - * Create a block cache configuration with the specified cache and - * configuration parameters. + * Create a block cache configuration with the specified cache and configuration parameters. * @param blockCache reference to block cache, null if completely disabled * @param cacheDataOnRead whether DATA blocks should be cached on read (we always cache INDEX - * blocks and BLOOM blocks; this cannot be disabled). + * blocks and BLOOM blocks; this cannot be disabled). * @param inMemory whether blocks should be flagged as in-memory * @param cacheDataOnWrite whether data blocks should be cached on write * @param cacheIndexesOnWrite whether index blocks should be cached on write @@ -296,7 +295,9 @@ public class CacheConfig { * @param cacheDataCompressed whether to store blocks as compressed in the cache * @param prefetchOnOpen whether to prefetch blocks upon open * @param cacheDataInL1 If more than one cache tier deployed, if true, cache this column families - * data blocks up in the L1 tier. + * data blocks up in the L1 tier. + * @param dropBehindCompaction indicate that we should set drop behind to true when open a store + * file reader for compaction */ CacheConfig(final BlockCache blockCache, final boolean cacheDataOnRead, final boolean inMemory, http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index c5b334a..0887ee8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -36,6 +36,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.atomic.LongAdder; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -462,8 +463,6 @@ public class HFile { boolean isPrimaryReplicaReader(); - void setPrimaryReplicaReader(boolean isPrimaryReplicaReader); - boolean shouldIncludeMemstoreTS(); boolean isDecodeMemstoreTS(); @@ -486,33 +485,32 @@ public class HFile { * @param size max size of the trailer. * @param cacheConf Cache configuation values, cannot be null. * @param hfs + * @param primaryReplicaReader true if this is a reader for primary replica * @return an appropriate instance of HFileReader * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH", justification="Intentional") - private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis, - long size, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException { + private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis, long size, + CacheConfig cacheConf, HFileSystem hfs, boolean primaryReplicaReader, Configuration conf) + throws IOException { FixedFileTrailer trailer = null; try { boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum(); assert !isHBaseChecksum; // Initially we must read with FS checksum. trailer = FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size); switch (trailer.getMajorVersion()) { - case 2: - LOG.debug("Opening HFile v2 with v3 reader"); - // Fall through. FindBugs: SF_SWITCH_FALLTHROUGH - case 3 : - return new HFileReaderImpl(path, trailer, fsdis, size, cacheConf, hfs, conf); - default: - throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion()); + case 2: + LOG.debug("Opening HFile v2 with v3 reader"); + // Fall through. FindBugs: SF_SWITCH_FALLTHROUGH + case 3: + return new HFileReaderImpl(path, trailer, fsdis, size, cacheConf, hfs, + primaryReplicaReader, conf); + default: + throw new IllegalArgumentException("Invalid HFile version " + trailer.getMajorVersion()); } } catch (Throwable t) { - try { - fsdis.close(); - } catch (Throwable t2) { - LOG.warn("Error closing fsdis FSDataInputStreamWrapper", t2); - } + IOUtils.closeQuietly(fsdis); throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, t); } } @@ -523,13 +521,13 @@ public class HFile { * @param fsdis a stream of path's file * @param size max size of the trailer. * @param cacheConf Cache configuration for hfile's contents + * @param primaryReplicaReader true if this is a reader for primary replica * @param conf Configuration * @return A version specific Hfile Reader * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException */ - @SuppressWarnings("resource") - public static Reader createReader(FileSystem fs, Path path, - FSDataInputStreamWrapper fsdis, long size, CacheConfig cacheConf, Configuration conf) + public static Reader createReader(FileSystem fs, Path path, FSDataInputStreamWrapper fsdis, + long size, CacheConfig cacheConf, boolean primaryReplicaReader, Configuration conf) throws IOException { HFileSystem hfs = null; @@ -540,9 +538,9 @@ public class HFile { if (!(fs instanceof HFileSystem)) { hfs = new HFileSystem(fs); } else { - hfs = (HFileSystem)fs; + hfs = (HFileSystem) fs; } - return pickReaderVersion(path, fsdis, size, cacheConf, hfs, conf); + return pickReaderVersion(path, fsdis, size, cacheConf, hfs, primaryReplicaReader, conf); } /** @@ -553,35 +551,39 @@ public class HFile { * @throws IOException Will throw a CorruptHFileException * (DoNotRetryIOException subtype) if hfile is corrupt/invalid. */ - public static Reader createReader( - FileSystem fs, Path path, Configuration conf) throws IOException { - return createReader(fs, path, CacheConfig.DISABLED, conf); - } + public static Reader createReader(FileSystem fs, Path path, Configuration conf) + throws IOException { + // The primaryReplicaReader is mainly used for constructing block cache key, so if we do not use + // block cache then it is OK to set it as any value. We use true here. + return createReader(fs, path, CacheConfig.DISABLED, true, conf); + } /** - * * @param fs filesystem * @param path Path to file to read - * @param cacheConf This must not be null. @see {@link org.apache.hadoop.hbase.io.hfile.CacheConfig#CacheConfig(Configuration)} + * @param cacheConf This must not be null. @see + * {@link org.apache.hadoop.hbase.io.hfile.CacheConfig#CacheConfig(Configuration)} + * @param primaryReplicaReader true if this is a reader for primary replica * @return an active Reader instance - * @throws IOException Will throw a CorruptHFileException (DoNotRetryIOException subtype) if hfile is corrupt/invalid. + * @throws IOException Will throw a CorruptHFileException (DoNotRetryIOException subtype) if hfile + * is corrupt/invalid. */ - public static Reader createReader( - FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException { + public static Reader createReader(FileSystem fs, Path path, CacheConfig cacheConf, + boolean primaryReplicaReader, Configuration conf) throws IOException { Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf"); FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path); - return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(), - cacheConf, stream.getHfs(), conf); + return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(), cacheConf, + stream.getHfs(), primaryReplicaReader, conf); } /** * This factory method is used only by unit tests */ - static Reader createReaderFromStream(Path path, - FSDataInputStream fsdis, long size, CacheConfig cacheConf, Configuration conf) - throws IOException { + @VisibleForTesting + static Reader createReaderFromStream(Path path, FSDataInputStream fsdis, long size, + CacheConfig cacheConf, Configuration conf) throws IOException { FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis); - return pickReaderVersion(path, wrapper, size, cacheConf, null, conf); + return pickReaderVersion(path, wrapper, size, cacheConf, null, true, conf); } /** @@ -606,22 +608,13 @@ public class HFile { throws IOException { final Path path = fileStatus.getPath(); final long size = fileStatus.getLen(); - FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path); - try { + try (FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path)) { boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum(); assert !isHBaseChecksum; // Initially we must read with FS checksum. FixedFileTrailer.readFromStream(fsdis.getStream(isHBaseChecksum), size); return true; } catch (IllegalArgumentException e) { return false; - } catch (IOException e) { - throw e; - } finally { - try { - fsdis.close(); - } catch (Throwable t) { - LOG.warn("Error closing fsdis FSDataInputStreamWrapper: " + path, t); - } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java index 030a25e..43b5c24 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java @@ -306,7 +306,7 @@ public class HFilePrettyPrinter extends Configured implements Tool { return -2; } - HFile.Reader reader = HFile.createReader(fs, file, new CacheConfig(getConf()), getConf()); + HFile.Reader reader = HFile.createReader(fs, file, new CacheConfig(getConf()), true, getConf()); Map<byte[], byte[]> fileInfo = reader.loadFileInfo(); http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 4e8cbaa..f0a1fa1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -85,7 +85,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { /** Filled when we read in the trailer. */ private final Compression.Algorithm compressAlgo; - private boolean isPrimaryReplicaReader; + private final boolean primaryReplicaReader; /** * What kind of data block encoding should be used while reading, writing, @@ -156,6 +156,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { /** Minor versions starting with this number have faked index key */ static final int MINOR_VERSION_WITH_FAKED_KEY = 3; + @VisibleForTesting + @Deprecated + public HFileReaderImpl(Path path, FixedFileTrailer trailer, FSDataInputStreamWrapper fsdis, + long fileSize, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) + throws IOException { + this(path, trailer, fsdis, fileSize, cacheConf, hfs, true, conf); + } + /** * Opens a HFile. You must load the index before you can use it by calling * {@link #loadFileInfo()}. @@ -175,11 +183,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { * Configuration */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD") - public HFileReaderImpl(final Path path, FixedFileTrailer trailer, - final FSDataInputStreamWrapper fsdis, - final long fileSize, final CacheConfig cacheConf, final HFileSystem hfs, - final Configuration conf) - throws IOException { + public HFileReaderImpl(Path path, FixedFileTrailer trailer, FSDataInputStreamWrapper fsdis, + long fileSize, CacheConfig cacheConf, HFileSystem hfs, boolean primaryReplicaReader, + Configuration conf) throws IOException { this.trailer = trailer; this.compressAlgo = trailer.getCompressionCodec(); this.cacheConf = cacheConf; @@ -187,6 +193,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { this.path = path; this.name = path.getName(); this.conf = conf; + this.primaryReplicaReader = primaryReplicaReader; checkFileVersion(); this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer); this.fsBlockReader = new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext); @@ -453,12 +460,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { @Override public boolean isPrimaryReplicaReader() { - return isPrimaryReplicaReader; - } - - @Override - public void setPrimaryReplicaReader(boolean isPrimaryReplicaReader) { - this.isPrimaryReplicaReader = isPrimaryReplicaReader; + return primaryReplicaReader; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 19daeed..3af4290 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -20,6 +20,11 @@ package org.apache.hadoop.hbase.mapreduce; import static java.lang.String.format; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -27,7 +32,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Deque; import java.util.HashMap; import java.util.HashSet; @@ -63,9 +67,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.backup.BackupType; -import org.apache.hadoop.hbase.backup.impl.BackupManager; -import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ClientServiceCallable; @@ -99,10 +100,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Tool to load the output of HFileOutputFormat into an existing table. */ @@ -937,8 +934,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } HFile.Reader hfr = null; try { - hfr = HFile.createReader(fs, hfilePath, - new CacheConfig(getConf()), getConf()); + hfr = HFile.createReader(fs, hfilePath, new CacheConfig(getConf()), true, getConf()); } catch (FileNotFoundException fnfe) { LOG.debug("encountered", fnfe); return new Pair<>(null, hfilePath.getName()); @@ -1105,7 +1101,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { HalfStoreFileReader halfReader = null; StoreFileWriter halfWriter = null; try { - halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, conf); + halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true, + new AtomicInteger(0), true, conf); Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); int blocksize = familyDescriptor.getBlocksize(); @@ -1213,30 +1210,26 @@ public class LoadIncrementalHFiles extends Configured implements Tool { public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus) throws IOException { Path hfile = hfileStatus.getPath(); - HFile.Reader reader = HFile.createReader(fs, hfile, - new CacheConfig(getConf()), getConf()); - try { + try (HFile.Reader reader = + HFile.createReader(fs, hfile, new CacheConfig(getConf()), true, getConf())) { if (hcd.getCompressionType() != reader.getFileContext().getCompression()) { hcd.setCompressionType(reader.getFileContext().getCompression()); - LOG.info("Setting compression " + hcd.getCompressionType().name() + - " for family " + hcd.toString()); + LOG.info("Setting compression " + hcd.getCompressionType().name() + " for family " + + hcd.toString()); } reader.loadFileInfo(); byte[] first = reader.getFirstRowKey(); - byte[] last = reader.getLastRowKey(); + byte[] last = reader.getLastRowKey(); - LOG.info("Trying to figure out region boundaries hfile=" + hfile + - " first=" + Bytes.toStringBinary(first) + - " last=" + Bytes.toStringBinary(last)); + LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" + + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); // To eventually infer start key-end key boundaries - Integer value = map.containsKey(first)? map.get(first):0; - map.put(first, value+1); + Integer value = map.containsKey(first) ? map.get(first) : 0; + map.put(first, value + 1); - value = map.containsKey(last)? map.get(last):0; - map.put(last, value-1); - } finally { - reader.close(); + value = map.containsKey(last) ? map.get(last) : 0; + map.put(last, value - 1); } } }); http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java index 366378a..3600fe0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MergeTableRegionsProcedure.java @@ -264,7 +264,7 @@ public class MergeTableRegionsProcedure @Override protected MergeTableRegionsState getState(final int stateId) { - return MergeTableRegionsState.valueOf(stateId); + return MergeTableRegionsState.forNumber(stateId); } @Override @@ -613,11 +613,8 @@ public class MergeTableRegionsProcedure final CacheConfig cacheConf = new CacheConfig(conf, hcd); for (StoreFileInfo storeFileInfo: storeFiles) { // Create reference file(s) of the region in mergedDir - regionFs.mergeStoreFile( - mergedRegionInfo, - family, - new StoreFile( - mfs.getFileSystem(), storeFileInfo, conf, cacheConf, hcd.getBloomFilterType()), + regionFs.mergeStoreFile(mergedRegionInfo, family, new StoreFile(mfs.getFileSystem(), + storeFileInfo, conf, cacheConf, hcd.getBloomFilterType(), true), mergedDir); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java index 3cd6c66..bf9afd7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java @@ -285,7 +285,7 @@ public class SplitTableRegionProcedure @Override protected SplitTableRegionState getState(final int stateId) { - return SplitTableRegionState.valueOf(stateId); + return SplitTableRegionState.forNumber(stateId); } @Override @@ -571,9 +571,9 @@ public class SplitTableRegionProcedure if (storeFiles != null && storeFiles.size() > 0) { final CacheConfig cacheConf = new CacheConfig(conf, hcd); for (StoreFileInfo storeFileInfo: storeFiles) { - StoreFileSplitter sfs = new StoreFileSplitter(regionFs, family.getBytes(), - new StoreFile(mfs.getFileSystem(), storeFileInfo, conf, - cacheConf, hcd.getBloomFilterType())); + StoreFileSplitter sfs = + new StoreFileSplitter(regionFs, family.getBytes(), new StoreFile(mfs.getFileSystem(), + storeFileInfo, conf, cacheConf, hcd.getBloomFilterType(), true)); futures.add(threadPool.submit(sfs)); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java index 7c4d6fe..90d1f2d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/CachedMobFile.java @@ -44,7 +44,9 @@ public class CachedMobFile extends MobFile implements Comparable<CachedMobFile> public static CachedMobFile create(FileSystem fs, Path path, Configuration conf, CacheConfig cacheConf) throws IOException { - StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE); + // XXX: primaryReplica is only used for constructing the key of block cache so it is not a + // critical problem if we pass the wrong value, so here we always pass true. Need to fix later. + StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE, true); return new CachedMobFile(sf); } http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java index cd4c079..73355e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFile.java @@ -118,9 +118,7 @@ public class MobFile { * @throws IOException */ public void open() throws IOException { - if (sf.getReader() == null) { - sf.createReader(); - } + sf.initReader(); } /** @@ -146,7 +144,9 @@ public class MobFile { */ public static MobFile create(FileSystem fs, Path path, Configuration conf, CacheConfig cacheConf) throws IOException { - StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE); + // XXX: primaryReplica is only used for constructing the key of block cache so it is not a + // critical problem if we pass the wrong value, so here we always pass true. Need to fix later. + StoreFile sf = new StoreFile(fs, path, conf, cacheConf, BloomType.NONE, true); return new MobFile(sf); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index eb75120..06c5001 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -333,7 +333,8 @@ public final class MobUtils { if (LOG.isDebugEnabled()) { LOG.debug(fileName + " is an expired file"); } - filesToClean.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE)); + filesToClean + .add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true)); } } catch (Exception e) { LOG.error("Cannot parse the fileName " + fileName, e); @@ -372,7 +373,7 @@ public final class MobUtils { Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR)); Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME); FileSystem fs = mobRootDir.getFileSystem(conf); - return mobRootDir.makeQualified(fs); + return mobRootDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); } /** @@ -697,7 +698,7 @@ public final class MobUtils { return null; } Path dstPath = new Path(targetPath, sourceFile.getName()); - validateMobFile(conf, fs, sourceFile, cacheConfig); + validateMobFile(conf, fs, sourceFile, cacheConfig, true); String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath; LOG.info(msg); Path parent = dstPath.getParent(); @@ -718,11 +719,11 @@ public final class MobUtils { * @param cacheConfig The current cache config. */ private static void validateMobFile(Configuration conf, FileSystem fs, Path path, - CacheConfig cacheConfig) throws IOException { + CacheConfig cacheConfig, boolean primaryReplica) throws IOException { StoreFile storeFile = null; try { - storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE); - storeFile.createReader(); + storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE, primaryReplica); + storeFile.initReader(); } catch (IOException e) { LOG.error("Failed to open mob file[" + path + "], keep it in temp directory.", e); throw e; http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index 987fe51..05c7076 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -223,12 +223,9 @@ public class PartitionedMobCompactor extends MobCompactor { // File in the Del Partition List // Get delId from the file - Reader reader = HFile.createReader(fs, linkedFile.getPath(), CacheConfig.DISABLED, conf); - try { + try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) { delId.setStartKey(reader.getFirstRowKey()); delId.setEndKey(reader.getLastRowKey()); - } finally { - reader.close(); } CompactionDelPartition delPartition = delFilesToCompact.get(delId); if (delPartition == null) { @@ -267,12 +264,9 @@ public class PartitionedMobCompactor extends MobCompactor { if (withDelFiles) { // get startKey and endKey from the file and update partition // TODO: is it possible to skip read of most hfiles? - Reader reader = HFile.createReader(fs, linkedFile.getPath(), CacheConfig.DISABLED, conf); - try { + try (Reader reader = HFile.createReader(fs, linkedFile.getPath(), conf)) { compactionPartition.setStartKey(reader.getFirstRowKey()); compactionPartition.setEndKey(reader.getLastRowKey()); - } finally { - reader.close(); } } @@ -340,10 +334,11 @@ public class PartitionedMobCompactor extends MobCompactor { try { for (CompactionDelPartition delPartition : request.getDelPartitions()) { for (Path newDelPath : delPartition.listDelFiles()) { - StoreFile sf = new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE); + StoreFile sf = + new StoreFile(fs, newDelPath, conf, compactionCacheConfig, BloomType.NONE, true); // pre-create reader of a del file to avoid race condition when opening the reader in each // partition. - sf.createReader(); + sf.initReader(); delPartition.addStoreFile(sf); totalDelFileCount++; } @@ -557,7 +552,7 @@ public class PartitionedMobCompactor extends MobCompactor { List<StoreFile> filesToCompact = new ArrayList<>(); for (int i = offset; i < batch + offset; i++) { StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig, - BloomType.NONE); + BloomType.NONE, true); filesToCompact.add(sf); } filesToCompact.addAll(delFiles); @@ -739,7 +734,7 @@ public class PartitionedMobCompactor extends MobCompactor { } for (int i = offset; i < batch + offset; i++) { batchedDelFiles.add(new StoreFile(fs, delFilePaths.get(i), conf, compactionCacheConfig, - BloomType.NONE)); + BloomType.NONE, true)); } // compact the del files in a batch. paths.add(compactDelFilesInBatch(request, batchedDelFiles)); @@ -809,8 +804,8 @@ public class PartitionedMobCompactor extends MobCompactor { */ private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType) throws IOException { - List scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false, - false, HConstants.LATEST_TIMESTAMP); + List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, + false, true, false, false, HConstants.LATEST_TIMESTAMP); Scan scan = new Scan(); scan.setMaxVersions(column.getMaxVersions()); long ttl = HStore.determineTTLFromFamily(column); @@ -893,7 +888,8 @@ public class PartitionedMobCompactor extends MobCompactor { for (StoreFile sf : storeFiles) { // the readers will be closed later after the merge. maxSeqId = Math.max(maxSeqId, sf.getMaxSequenceId()); - byte[] count = sf.createReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT); + sf.initReader(); + byte[] count = sf.getReader().loadFileInfo().get(StoreFile.MOB_CELLS_COUNT); if (count != null) { maxKeyCount += Bytes.toLong(count); } http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index c37ae99..da25df5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -135,7 +135,7 @@ class DefaultStoreFileManager implements StoreFileManager { this.compactedfiles = sortCompactedfiles(updatedCompactedfiles); } - // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised + // Mark the files as compactedAway once the storefiles and compactedfiles list is finalized // Let a background thread close the actual reader on these compacted files and also // ensure to evict the blocks from block cache so that they are no longer in // cache http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index b021430..032e383 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -292,9 +292,9 @@ public class HMobStore extends HStore { private void validateMobFile(Path path) throws IOException { StoreFile storeFile = null; try { - storeFile = - new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE); - storeFile.createReader(); + storeFile = new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, + BloomType.NONE, isPrimaryReplicaStore()); + storeFile.initReader(); } catch (IOException e) { LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e); throw e; http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 78ce608..b21a84d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4160,8 +4160,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName(); Set<StoreFile> fakeStoreFiles = new HashSet<>(files.size()); for (Path file: files) { - fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf, - null, null)); + fakeStoreFiles.add( + new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf, null, null, true)); } getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles); } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 144f43b..014427d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -294,7 +294,7 @@ public class HRegionFileSystem { */ Path getStoreFilePath(final String familyName, final String fileName) { Path familyDir = getStoreDir(familyName); - return new Path(familyDir, fileName).makeQualified(this.fs); + return new Path(familyDir, fileName).makeQualified(fs.getUri(), fs.getWorkingDirectory()); } /** @@ -675,9 +675,7 @@ public class HRegionFileSystem { if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) { // Check whether the split row lies in the range of the store file // If it is outside the range, return directly. - if (f.getReader() == null) { - f.createReader(); - } + f.initReader(); try { if (top) { //check if larger than last key. http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index a98f89e..cbdaa1b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -650,13 +650,11 @@ public class HStore implements Store { return createStoreFileAndReader(info); } - private StoreFile createStoreFileAndReader(final StoreFileInfo info) - throws IOException { + private StoreFile createStoreFileAndReader(final StoreFileInfo info) throws IOException { info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf, - this.family.getBloomFilterType()); - StoreFileReader r = storeFile.createReader(); - r.setReplicaStoreFile(isPrimaryReplicaStore()); + this.family.getBloomFilterType(), isPrimaryReplicaStore()); + storeFile.initReader(); return storeFile; } @@ -726,8 +724,8 @@ public class HStore implements Store { try { LOG.info("Validating hfile at " + srcPath + " for inclusion in " + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString()); - reader = HFile.createReader(srcPath.getFileSystem(conf), - srcPath, cacheConf, conf); + reader = HFile.createReader(srcPath.getFileSystem(conf), srcPath, cacheConf, + isPrimaryReplicaStore(), conf); reader.loadFileInfo(); byte[] firstKey = reader.getFirstRowKey(); @@ -1180,7 +1178,7 @@ public class HStore implements Store { // but now we get them in ascending order, which I think is // actually more correct, since memstore get put at the end. List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan, - cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore()); + cacheBlocks, usePread, isCompaction, false, matcher, readPt); List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); scanners.addAll(sfScanners); // Then the memstore scanners @@ -1203,7 +1201,7 @@ public class HStore implements Store { } } List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files, - cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore()); + cacheBlocks, usePread, isCompaction, false, matcher, readPt); List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1); scanners.addAll(sfScanners); // Then the memstore scanners @@ -2456,8 +2454,9 @@ public class HStore implements Store { LOG.debug("The file " + file + " was closed but still not archived."); } filesToRemove.add(file); + continue; } - if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) { + if (file.isCompactedAway() && !file.isReferencedInReads()) { // Even if deleting fails we need not bother as any new scanners won't be // able to use the compacted file as the status is already compactedAway if (LOG.isTraceEnabled()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java index 41c13f5..d71af2b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java @@ -54,7 +54,7 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { /** Constructor for testing. */ ReversedStoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, - final NavigableSet<byte[]> columns, final List<KeyValueScanner> scanners) + final NavigableSet<byte[]> columns, final List<? extends KeyValueScanner> scanners) throws IOException { super(scan, scanInfo, scanType, columns, scanners, HConstants.LATEST_TIMESTAMP); http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 7aef05e..c53fbf08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -30,6 +30,7 @@ import java.util.Comparator; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,7 +55,7 @@ import org.apache.hadoop.hbase.util.Bytes; * and append data. Be sure to add any metadata before calling close on the * Writer (Use the appendMetadata convenience methods). On close, a StoreFile * is sitting in the Filesystem. To refer to it, create a StoreFile instance - * passing filesystem and path. To read, call {@link #createReader()}. + * passing filesystem and path. To read, call {@link #initReader()} * <p>StoreFiles may also reference store files in another Store. * * The reason for this weird pattern where you use a different instance for the @@ -64,6 +65,10 @@ import org.apache.hadoop.hbase.util.Bytes; public class StoreFile { private static final Log LOG = LogFactory.getLog(StoreFile.class.getName()); + public static final String STORE_FILE_READER_NO_READAHEAD = "hbase.store.reader.no-readahead"; + + private static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false; + // Keys for fileinfo values in HFile /** Max Sequence ID in FileInfo */ @@ -103,6 +108,18 @@ public class StoreFile { // Block cache configuration and reference. private final CacheConfig cacheConf; + // Counter that is incremented every time a scanner is created on the + // store file. It is decremented when the scan on the store file is + // done. + private final AtomicInteger refCount = new AtomicInteger(0); + + private final boolean noReadahead; + + private final boolean primaryReplica; + + // Indicates if the file got compacted + private volatile boolean compactedAway = false; + // Keys for metadata stored in backing HFile. // Set when we obtain a Reader. private long sequenceid = -1; @@ -116,7 +133,7 @@ public class StoreFile { private Cell lastKey; - private Comparator comparator; + private Comparator<Cell> comparator; CacheConfig getCacheConf() { return cacheConf; @@ -130,7 +147,7 @@ public class StoreFile { return lastKey; } - public Comparator getComparator() { + public Comparator<Cell> getComparator() { return comparator; } @@ -179,72 +196,96 @@ public class StoreFile { public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID"); /** - * Constructor, loads a reader and it's indices, etc. May allocate a - * substantial amount of ram depending on the underlying files (10-20MB?). - * - * @param fs The current file system to use. - * @param p The path of the file. - * @param conf The current configuration. - * @param cacheConf The cache configuration and block cache reference. - * @param cfBloomType The bloom type to use for this store file as specified - * by column family configuration. This may or may not be the same - * as the Bloom filter type actually present in the HFile, because - * column family configuration might change. If this is + * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram + * depending on the underlying files (10-20MB?). + * @param fs The current file system to use. + * @param p The path of the file. + * @param conf The current configuration. + * @param cacheConf The cache configuration and block cache reference. + * @param cfBloomType The bloom type to use for this store file as specified by column family + * configuration. This may or may not be the same as the Bloom filter type actually + * present in the HFile, because column family configuration might change. If this is * {@link BloomType#NONE}, the existing Bloom filter is ignored. - * @throws IOException When opening the reader fails. + * @deprecated Now we will specific whether the StoreFile is for primary replica when + * constructing, so please use + * {@link #StoreFile(FileSystem, Path, Configuration, CacheConfig, BloomType, boolean)} + * directly. */ + @Deprecated public StoreFile(final FileSystem fs, final Path p, final Configuration conf, - final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException { + final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException { this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType); } /** - * Constructor, loads a reader and it's indices, etc. May allocate a - * substantial amount of ram depending on the underlying files (10-20MB?). - * - * @param fs The current file system to use. - * @param fileInfo The store file information. - * @param conf The current configuration. - * @param cacheConf The cache configuration and block cache reference. - * @param cfBloomType The bloom type to use for this store file as specified - * by column family configuration. This may or may not be the same - * as the Bloom filter type actually present in the HFile, because - * column family configuration might change. If this is + * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram + * depending on the underlying files (10-20MB?). + * @param fs The current file system to use. + * @param p The path of the file. + * @param conf The current configuration. + * @param cacheConf The cache configuration and block cache reference. + * @param cfBloomType The bloom type to use for this store file as specified by column family + * configuration. This may or may not be the same as the Bloom filter type actually + * present in the HFile, because column family configuration might change. If this is + * {@link BloomType#NONE}, the existing Bloom filter is ignored. + * @param primaryReplica true if this is a store file for primary replica, otherwise false. + * @throws IOException + */ + public StoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf, + BloomType cfBloomType, boolean primaryReplica) throws IOException { + this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType, primaryReplica); + } + + /** + * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram + * depending on the underlying files (10-20MB?). + * @param fs The current file system to use. + * @param fileInfo The store file information. + * @param conf The current configuration. + * @param cacheConf The cache configuration and block cache reference. + * @param cfBloomType The bloom type to use for this store file as specified by column family + * configuration. This may or may not be the same as the Bloom filter type actually + * present in the HFile, because column family configuration might change. If this is * {@link BloomType#NONE}, the existing Bloom filter is ignored. - * @throws IOException When opening the reader fails. + * @deprecated Now we will specific whether the StoreFile is for primary replica when + * constructing, so please use + * {@link #StoreFile(FileSystem, StoreFileInfo, Configuration, CacheConfig, BloomType, boolean)} + * directly. */ + @Deprecated public StoreFile(final FileSystem fs, final StoreFileInfo fileInfo, final Configuration conf, - final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException { + final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException { + this(fs, fileInfo, conf, cacheConf, cfBloomType, true); + } + + /** + * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram + * depending on the underlying files (10-20MB?). + * @param fs fs The current file system to use. + * @param fileInfo The store file information. + * @param conf The current configuration. + * @param cacheConf The cache configuration and block cache reference. + * @param cfBloomType cfBloomType The bloom type to use for this store file as specified by column + * family configuration. This may or may not be the same as the Bloom filter type + * actually present in the HFile, because column family configuration might change. If + * this is {@link BloomType#NONE}, the existing Bloom filter is ignored. + * @param primaryReplica true if this is a store file for primary replica, otherwise false. + */ + public StoreFile(FileSystem fs, StoreFileInfo fileInfo, Configuration conf, CacheConfig cacheConf, + BloomType cfBloomType, boolean primaryReplica) { this.fs = fs; this.fileInfo = fileInfo; this.cacheConf = cacheConf; - + this.noReadahead = + conf.getBoolean(STORE_FILE_READER_NO_READAHEAD, DEFAULT_STORE_FILE_READER_NO_READAHEAD); if (BloomFilterFactory.isGeneralBloomEnabled(conf)) { this.cfBloomType = cfBloomType; } else { - LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + - "cfBloomType=" + cfBloomType + " (disabled in config)"); + LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" + + cfBloomType + " (disabled in config)"); this.cfBloomType = BloomType.NONE; } - } - - /** - * Clone - * @param other The StoreFile to clone from - */ - public StoreFile(final StoreFile other) { - this.fs = other.fs; - this.fileInfo = other.fileInfo; - this.cacheConf = other.cacheConf; - this.cfBloomType = other.cfBloomType; - this.metadataMap = other.metadataMap; - } - - /** - * Clone a StoreFile for opening private reader. - */ - public StoreFile cloneForReader() { - return new StoreFile(this); + this.primaryReplica = primaryReplica; } /** @@ -266,12 +307,12 @@ public class StoreFile { * @return Returns the qualified path of this StoreFile */ public Path getQualifiedPath() { - return this.fileInfo.getPath().makeQualified(fs); + return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory()); } /** * @return True if this is a StoreFile Reference; call - * after {@link #open(boolean canUseDropBehind)} else may get wrong answer. + * after {@link #open()} else may get wrong answer. */ public boolean isReference() { return this.fileInfo.isReference(); @@ -376,15 +417,21 @@ public class StoreFile { @VisibleForTesting public boolean isCompactedAway() { - if (this.reader != null) { - return this.reader.isCompactedAway(); - } - return true; + return compactedAway; } @VisibleForTesting public int getRefCount() { - return this.reader.getRefCount().get(); + return refCount.get(); + } + + /** + * @return true if the file is still used in reads + */ + public boolean isReferencedInReads() { + int rc = refCount.get(); + assert rc >= 0; // we should not go negative. + return rc > 0; } /** @@ -404,18 +451,18 @@ public class StoreFile { } /** - * Opens reader on this store file. Called by Constructor. - * @return Reader for the store file. + * Opens reader on this store file. Called by Constructor. * @throws IOException * @see #closeReader(boolean) */ - private StoreFileReader open(boolean canUseDropBehind) throws IOException { + private void open() throws IOException { if (this.reader != null) { throw new IllegalAccessError("Already open"); } // Open the StoreFile.Reader - this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind); + this.reader = fileInfo.open(this.fs, this.cacheConf, false, noReadahead ? 0L : -1L, + primaryReplica, refCount, true); // Load up indices and fileinfo. This also loads Bloom filter type. metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo()); @@ -513,38 +560,45 @@ public class StoreFile { firstKey = reader.getFirstKey(); lastKey = reader.getLastKey(); comparator = reader.getComparator(); - return this.reader; - } - - public StoreFileReader createReader() throws IOException { - return createReader(false); } /** - * @return Reader for StoreFile. creates if necessary - * @throws IOException + * Initialize the reader used for pread. */ - public StoreFileReader createReader(boolean canUseDropBehind) throws IOException { - if (this.reader == null) { + public void initReader() throws IOException { + if (reader == null) { try { - this.reader = open(canUseDropBehind); - } catch (IOException e) { + open(); + } catch (Exception e) { try { - boolean evictOnClose = - cacheConf != null? cacheConf.shouldEvictOnClose(): true; + boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true; this.closeReader(evictOnClose); } catch (IOException ee) { + LOG.warn("failed to close reader", ee); } throw e; } - } - return this.reader; + } + + private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException { + initReader(); + StoreFileReader reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind, -1L, + primaryReplica, refCount, false); + reader.copyFields(this.reader); + return reader; + } + + public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks, + boolean pread, boolean isCompaction, long readPt, long scannerOrder, + boolean canOptimizeForNonNullColumn) throws IOException { + return createStreamReader(canUseDropBehind).getStoreFileScanner( + cacheBlocks, pread, isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn); } /** - * @return Current reader. Must call createReader first else returns null. - * @see #createReader() + * @return Current reader. Must call initReader first else returns null. + * @see #initReader() */ public StoreFileReader getReader() { return this.reader; @@ -566,9 +620,7 @@ public class StoreFile { * Marks the status of the file as compactedAway. */ public void markCompactedAway() { - if (this.reader != null) { - this.reader.markCompactedAway(); - } + this.compactedAway = true; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java index 3c12045..c4754a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java @@ -21,17 +21,18 @@ package org.apache.hadoop.hbase.regionserver; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.HalfStoreFileReader; @@ -233,25 +234,24 @@ public class StoreFileInfo { * @param cacheConf The cache configuration and block cache reference. * @return The StoreFile.Reader for the file */ - public StoreFileReader open(final FileSystem fs, - final CacheConfig cacheConf, final boolean canUseDropBehind) throws IOException { + public StoreFileReader open(FileSystem fs, CacheConfig cacheConf, boolean canUseDropBehind, + long readahead, boolean isPrimaryReplicaStoreFile, AtomicInteger refCount, boolean shared) + throws IOException { FSDataInputStreamWrapper in; FileStatus status; final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction(); if (this.link != null) { // HFileLink - in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind); + in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind, readahead); status = this.link.getFileStatus(fs); } else if (this.reference != null) { // HFile Reference Path referencePath = getReferredToFile(this.getPath()); - in = new FSDataInputStreamWrapper(fs, referencePath, - doDropBehind); + in = new FSDataInputStreamWrapper(fs, referencePath, doDropBehind, readahead); status = fs.getFileStatus(referencePath); } else { - in = new FSDataInputStreamWrapper(fs, this.getPath(), - doDropBehind); + in = new FSDataInputStreamWrapper(fs, this.getPath(), doDropBehind, readahead); status = fs.getFileStatus(initialPath); } long length = status.getLen(); @@ -265,9 +265,10 @@ public class StoreFileInfo { if (reader == null) { if (this.reference != null) { reader = new HalfStoreFileReader(fs, this.getPath(), in, length, cacheConf, reference, - conf); + isPrimaryReplicaStoreFile, refCount, shared, conf); } else { - reader = new StoreFileReader(fs, status.getPath(), in, length, cacheConf, conf); + reader = new StoreFileReader(fs, status.getPath(), in, length, cacheConf, + isPrimaryReplicaStoreFile, refCount, shared, conf); } } if (this.coprocessorHost != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java index 8f01a93..b015ea5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; + import java.io.DataInput; import java.io.IOException; import java.util.Map; @@ -34,7 +36,6 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; @@ -68,36 +69,47 @@ public class StoreFileReader { private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null; private boolean skipResetSeqId = true; - public AtomicInteger getRefCount() { - return refCount; - } - // Counter that is incremented every time a scanner is created on the - // store file. It is decremented when the scan on the store file is - // done. - private AtomicInteger refCount = new AtomicInteger(0); - // Indicates if the file got compacted - private volatile boolean compactedAway = false; + // store file. It is decremented when the scan on the store file is + // done. All StoreFileReader for the same StoreFile will share this counter. + private final AtomicInteger refCount; - public StoreFileReader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) - throws IOException { - reader = HFile.createReader(fs, path, cacheConf, conf); + // indicate that whether this StoreFileReader is shared, i.e., used for pread. If not, we will + // close the internal reader when readCompleted is called. + private final boolean shared; + + private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, boolean shared) { + this.reader = reader; bloomFilterType = BloomType.NONE; + this.refCount = refCount; + this.shared = shared; } - void markCompactedAway() { - this.compactedAway = true; + public StoreFileReader(FileSystem fs, Path path, CacheConfig cacheConf, + boolean primaryReplicaStoreFile, AtomicInteger refCount, boolean shared, Configuration conf) + throws IOException { + this(HFile.createReader(fs, path, cacheConf, primaryReplicaStoreFile, conf), refCount, shared); } public StoreFileReader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size, - CacheConfig cacheConf, Configuration conf) throws IOException { - reader = HFile.createReader(fs, path, in, size, cacheConf, conf); - bloomFilterType = BloomType.NONE; + CacheConfig cacheConf, boolean primaryReplicaStoreFile, AtomicInteger refCount, + boolean shared, Configuration conf) throws IOException { + this(HFile.createReader(fs, path, in, size, cacheConf, primaryReplicaStoreFile, conf), refCount, + shared); } - public void setReplicaStoreFile(boolean isPrimaryReplicaStoreFile) { - reader.setPrimaryReplicaReader(isPrimaryReplicaStoreFile); + void copyFields(StoreFileReader reader) { + this.generalBloomFilter = reader.generalBloomFilter; + this.deleteFamilyBloomFilter = reader.deleteFamilyBloomFilter; + this.bloomFilterType = reader.bloomFilterType; + this.sequenceID = reader.sequenceID; + this.timeRange = reader.timeRange; + this.lastBloomKey = reader.lastBloomKey; + this.bulkLoadResult = reader.bulkLoadResult; + this.lastBloomKeyOnlyKV = reader.lastBloomKeyOnlyKV; + this.skipResetSeqId = reader.skipResetSeqId; } + public boolean isPrimaryReplicaReader() { return reader.isPrimaryReplicaReader(); } @@ -105,8 +117,11 @@ public class StoreFileReader { /** * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS */ + @VisibleForTesting StoreFileReader() { + this.refCount = new AtomicInteger(0); this.reader = null; + this.shared = false; } public CellComparator getComparator() { @@ -128,30 +143,23 @@ public class StoreFileReader { boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) { // Increment the ref count refCount.incrementAndGet(); - return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, - reader.hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn); + return new StoreFileScanner(this, getScanner(cacheBlocks, pread, isCompaction), + !isCompaction, reader.hasMVCCInfo(), readPt, scannerOrder, canOptimizeForNonNullColumn); } /** - * Decrement the ref count associated with the reader when ever a scanner associated - * with the reader is closed + * Indicate that the scanner has finished reading with this reader. We need to decrement the ref + * count, and also, if this is not the common pread reader, we should close it. */ - void decrementRefCount() { + void readCompleted() { refCount.decrementAndGet(); - } - - /** - * @return true if the file is still used in reads - */ - public boolean isReferencedInReads() { - return refCount.get() != 0; - } - - /** - * @return true if the file is compacted - */ - public boolean isCompactedAway() { - return this.compactedAway; + if (!shared) { + try { + reader.close(false); + } catch (IOException e) { + LOG.warn("failed to close stream reader", e); + } + } } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index ab6b0ef..aa4f897 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -31,8 +31,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.TimeRange; @@ -124,26 +122,44 @@ public class StoreFileScanner implements KeyValueScanner { */ public static List<StoreFileScanner> getScannersForStoreFiles(Collection<StoreFile> files, boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean canUseDrop, - ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException { + ScanQueryMatcher matcher, long readPt) throws IOException { List<StoreFileScanner> scanners = new ArrayList<>(files.size()); - List<StoreFile> sorted_files = new ArrayList<>(files); - Collections.sort(sorted_files, StoreFile.Comparators.SEQ_ID); - for (int i = 0; i < sorted_files.size(); i++) { - StoreFileReader r = sorted_files.get(i).createReader(canUseDrop); - r.setReplicaStoreFile(isPrimaryReplica); - StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, isCompaction, readPt, - i, matcher != null ? !matcher.hasNullColumnInQuery() : false); + List<StoreFile> sortedFiles = new ArrayList<>(files); + Collections.sort(sortedFiles, StoreFile.Comparators.SEQ_ID); + for (int i = 0, n = sortedFiles.size(); i < n; i++) { + StoreFile sf = sortedFiles.get(i); + sf.initReader(); + StoreFileScanner scanner = sf.getReader().getStoreFileScanner(cacheBlocks, usePread, + isCompaction, readPt, i, matcher != null ? !matcher.hasNullColumnInQuery() : false); scanners.add(scanner); } return scanners; } - public static List<StoreFileScanner> getScannersForStoreFiles( - Collection<StoreFile> files, boolean cacheBlocks, boolean usePread, - boolean isCompaction, boolean canUseDrop, - ScanQueryMatcher matcher, long readPt) throws IOException { - return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, canUseDrop, - matcher, readPt, true); + /** + * Get scanners for compaction. We will create a separated reader for each store file to avoid + * contention with normal read request. + */ + public static List<StoreFileScanner> getScannersForCompaction(Collection<StoreFile> files, + boolean canUseDropBehind, long readPt) throws IOException { + List<StoreFileScanner> scanners = new ArrayList<>(files.size()); + List<StoreFile> sortedFiles = new ArrayList<>(files); + Collections.sort(sortedFiles, StoreFile.Comparators.SEQ_ID); + boolean succ = false; + try { + for (int i = 0, n = sortedFiles.size(); i < n; i++) { + scanners.add(sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, false, true, + readPt, i, false)); + } + succ = true; + } finally { + if (!succ) { + for (StoreFileScanner scanner : scanners) { + scanner.close(); + } + } + } + return scanners; } public String toString() { @@ -262,7 +278,7 @@ public class StoreFileScanner implements KeyValueScanner { cur = null; this.hfs.close(); if (this.reader != null) { - this.reader.decrementRefCount(); + this.reader.readCompleted(); } closed = true; } http://git-wip-us.apache.org/repos/asf/hbase/blob/66b616d7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 99ec30e..3bc6a0f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -312,7 +312,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @VisibleForTesting StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, final NavigableSet<byte[]> columns, - final List<KeyValueScanner> scanners) throws IOException { + final List<? extends KeyValueScanner> scanners) throws IOException { this(scan, scanInfo, scanType, columns, scanners, HConstants.LATEST_TIMESTAMP, // 0 is passed as readpoint because the test bypasses Store @@ -322,7 +322,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @VisibleForTesting StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, final NavigableSet<byte[]> columns, - final List<KeyValueScanner> scanners, long earliestPutTs) + final List<? extends KeyValueScanner> scanners, long earliestPutTs) throws IOException { this(scan, scanInfo, scanType, columns, scanners, earliestPutTs, // 0 is passed as readpoint because the test bypasses Store @@ -330,7 +330,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } public StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, - final NavigableSet<byte[]> columns, final List<KeyValueScanner> scanners, long earliestPutTs, + final NavigableSet<byte[]> columns, final List<? extends KeyValueScanner> scanners, long earliestPutTs, long readPt) throws IOException { this(null, scan, scanInfo, columns, readPt, scanType == ScanType.USER_SCAN ? scan.getCacheBlocks() : false);