This is an automated email from the ASF dual-hosted git repository. wchevreuil pushed a commit to branch HBASE-28463 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-28463 by this push: new d3317caa5ff HBASE-28469: Integration of time-based priority caching into compaction paths (#5866) d3317caa5ff is described below commit d3317caa5ffe28324840a79661b23dd130d4daf9 Author: vinayak hegde <vinayakph...@gmail.com> AuthorDate: Wed May 22 18:59:49 2024 +0530 HBASE-28469: Integration of time-based priority caching into compaction paths (#5866) Signed-off-by: Wellington Chevreuil <wchevre...@apache.org> Reviewed-by: Janardhan Hugund <janardhan.hung...@cloudera.com> --- .../apache/hadoop/hbase/io/hfile/BlockCache.java | 17 ++ .../hadoop/hbase/io/hfile/BlockCacheKey.java | 1 - .../hadoop/hbase/io/hfile/CombinedBlockCache.java | 20 ++- .../org/apache/hadoop/hbase/io/hfile/HFile.java | 5 + .../hadoop/hbase/io/hfile/HFileWriterImpl.java | 53 +++++- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 13 ++ .../hbase/regionserver/DataTieringManager.java | 50 +++++- .../hbase/regionserver/HRegionFileSystem.java | 25 +++ .../hadoop/hbase/regionserver/StoreFileWriter.java | 23 +-- .../hbase/regionserver/TimeRangeTracker.java | 4 +- .../hbase/regionserver/TestDataTieringManager.java | 183 ++++++++++++++++++--- 11 files changed, 336 insertions(+), 58 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index ac83af1053a..922ac5dd144 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Optional; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @@ -207,6 +208,22 @@ public interface BlockCache extends Iterable<CachedBlock> { return Optional.empty(); } + /** + * Checks whether the block represented by the given key should be cached or not. This method may + * not be overridden by all implementing classes. In such cases, the returned Optional will be + * empty. For subclasses implementing this logic, the returned Optional would contain the boolean + * value reflecting if the passed block should indeed be cached. + * @param key The key representing the block to check if it should be cached. + * @param timeRangeTracker the time range tracker containing the timestamps + * @param conf The configuration object to use for determining caching behavior. + * @return An empty Optional if this method is not supported; otherwise, the returned Optional + * contains the boolean value indicating if the block should be cached. + */ + default Optional<Boolean> shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker, + Configuration conf) { + return Optional.empty(); + } + /** * Checks whether the block for the passed key is already cached. This method may not be * overridden by all implementing classes. In such cases, the returned Optional will be empty. For diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java index bf22d38e373..bcc1f58ba5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java @@ -116,5 +116,4 @@ public class BlockCacheKey implements HeapSize, java.io.Serializable { public Path getFilePath() { return filePath; } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index b12510cdccd..c29ed1ecf31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -484,11 +485,22 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize { @Override public Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration conf) { - Optional<Boolean> l1Result = l1Cache.shouldCacheFile(hFileInfo, conf); - Optional<Boolean> l2Result = l2Cache.shouldCacheFile(hFileInfo, conf); + return combineCacheResults(l1Cache.shouldCacheFile(hFileInfo, conf), + l2Cache.shouldCacheFile(hFileInfo, conf)); + } + + @Override + public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker, + Configuration conf) { + return combineCacheResults(l1Cache.shouldCacheBlock(key, timeRangeTracker, conf), + l2Cache.shouldCacheBlock(key, timeRangeTracker, conf)); + } + + private Optional<Boolean> combineCacheResults(Optional<Boolean> result1, + Optional<Boolean> result2) { final Mutable<Boolean> combinedResult = new MutableBoolean(true); - l1Result.ifPresent(b -> combinedResult.setValue(b && combinedResult.getValue())); - l2Result.ifPresent(b -> combinedResult.setValue(b && combinedResult.getValue())); + result1.ifPresent(b -> combinedResult.setValue(b && combinedResult.getValue())); + result2.ifPresent(b -> combinedResult.setValue(b && combinedResult.getValue())); return Optional.of(combinedResult.getValue()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index ae79ad85724..2c3908aa33f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -212,6 +212,11 @@ public final class HFile { /** Add an element to the file info map. */ void appendFileInfo(byte[] key, byte[] value) throws IOException; + /** + * Add TimestampRange and earliest put timestamp to Metadata + */ + void appendTrackedTimestampsToMetadata() throws IOException; + /** Returns the path to this {@link HFile} */ Path getPath(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index d2dfaf62106..cb7d0509cd9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.io.hfile; import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; import java.io.DataOutput; import java.io.DataOutputStream; @@ -26,6 +28,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -45,6 +48,7 @@ import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding; import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.BloomFilterWriter; @@ -117,6 +121,8 @@ public class HFileWriterImpl implements HFile.Writer { /** May be null if we were passed a stream. */ protected final Path path; + protected final Configuration conf; + /** Cache configuration for caching data on write. */ protected final CacheConfig cacheConf; @@ -170,12 +176,16 @@ public class HFileWriterImpl implements HFile.Writer { protected long maxMemstoreTS = 0; + private final TimeRangeTracker timeRangeTracker; + private long earliestPutTs = HConstants.LATEST_TIMESTAMP; + public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path, FSDataOutputStream outputStream, HFileContext fileContext) { this.outputStream = outputStream; this.path = path; this.name = path != null ? path.getName() : outputStream.toString(); this.hFileContext = fileContext; + this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); if (encoding != DataBlockEncoding.NONE) { this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); @@ -190,6 +200,7 @@ public class HFileWriterImpl implements HFile.Writer { } closeOutputStream = path != null; this.cacheConf = cacheConf; + this.conf = conf; float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 0f); this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio); @@ -555,9 +566,12 @@ public class HFileWriterImpl implements HFile.Writer { private void doCacheOnWrite(long offset) { cacheConf.getBlockCache().ifPresent(cache -> { HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); + BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType()); + if (!shouldCacheBlock(cache, key)) { + return; + } try { - cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()), - cacheFormatBlock, cacheConf.isInMemory(), true); + cache.cacheBlock(key, cacheFormatBlock, cacheConf.isInMemory(), true); } finally { // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent cacheFormatBlock.release(); @@ -565,6 +579,18 @@ public class HFileWriterImpl implements HFile.Writer { }); } + private BlockCacheKey buildCacheBlockKey(long offset, BlockType blockType) { + if (path != null) { + return new BlockCacheKey(path, offset, true, blockType); + } + return new BlockCacheKey(name, offset, true, blockType); + } + + private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) { + Optional<Boolean> result = cache.shouldCacheBlock(key, timeRangeTracker, conf); + return result.orElse(true); + } + /** * Ready a new block for writing. */ @@ -767,6 +793,8 @@ public class HFileWriterImpl implements HFile.Writer { if (tagsLength > this.maxTagsLength) { this.maxTagsLength = tagsLength; } + + trackTimestamps(cell); } @Override @@ -859,4 +887,25 @@ public class HFileWriterImpl implements HFile.Writer { outputStream = null; } } + + /** + * Add TimestampRange and earliest put timestamp to Metadata + */ + public void appendTrackedTimestampsToMetadata() throws IOException { + // TODO: The StoreFileReader always converts the byte[] to TimeRange + // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. + appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker)); + appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); + } + + /** + * Record the earliest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker + * to include the timestamp of this key + */ + private void trackTimestamps(final Cell cell) { + if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) { + earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); + } + timeRangeTracker.includeTimestamp(cell); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 0b53d047990..c14cf76ae94 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; import org.apache.hadoop.hbase.regionserver.DataTieringManager; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.IdReadWriteLock; @@ -2203,6 +2204,18 @@ public class BucketCache implements BlockCache, HeapSize { return Optional.of(!fullyCachedFiles.containsKey(fileName)); } + @Override + public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker, + Configuration conf) { + DataTieringManager dataTieringManager = DataTieringManager.getInstance(); + if (dataTieringManager != null && !dataTieringManager.isHotData(timeRangeTracker, conf)) { + LOG.debug("Data tiering is enabled for file: '{}' and it is not hot data", + key.getHfileName()); + return Optional.of(false); + } + return Optional.of(true); + } + @Override public Optional<Boolean> isAlreadyCached(BlockCacheKey key) { return Optional.of(getBackingMap().containsKey(key)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java index 952b4d4938d..aa56e3f6444 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java @@ -131,6 +131,27 @@ public class DataTieringManager { return isHotData(hFilePath); } + /** + * Determines whether the data associated with the given time range tracker is considered hot. If + * the data tiering type is set to {@link DataTieringType#TIME_RANGE}, it uses the maximum + * timestamp from the time range tracker to determine if the data is hot. Otherwise, it considers + * the data as hot by default. + * @param timeRangeTracker the time range tracker containing the timestamps + * @param conf The configuration object to use for determining hot data criteria. + * @return {@code true} if the data is hot, {@code false} otherwise + */ + public boolean isHotData(TimeRangeTracker timeRangeTracker, Configuration conf) { + DataTieringType dataTieringType = getDataTieringType(conf); + if ( + dataTieringType.equals(DataTieringType.TIME_RANGE) + && timeRangeTracker.getMax() != TimeRangeTracker.INITIAL_MAX_TIMESTAMP + ) { + return hotDataValidator(timeRangeTracker.getMax(), getDataTieringHotDataAge(conf)); + } + // DataTieringType.NONE or other types are considered hot by default + return true; + } + /** * Determines whether the data in the HFile at the given path is considered hot based on the * configured data tiering type and hot data age. If the data tiering type is set to @@ -151,6 +172,27 @@ public class DataTieringManager { return true; } + /** + * Determines whether the data in the HFile at the given path is considered hot based on the + * configured data tiering type and hot data age. If the data tiering type is set to + * {@link DataTieringType#TIME_RANGE}, it validates the data against the provided maximum + * timestamp. + * @param hFilePath the path to the HFile + * @param maxTimestamp the maximum timestamp to validate against + * @return {@code true} if the data is hot, {@code false} otherwise + * @throws DataTieringException if there is an error retrieving data tiering information + */ + public boolean isHotData(Path hFilePath, long maxTimestamp) throws DataTieringException { + Configuration configuration = getConfiguration(hFilePath); + DataTieringType dataTieringType = getDataTieringType(configuration); + + if (dataTieringType.equals(DataTieringType.TIME_RANGE)) { + return hotDataValidator(maxTimestamp, getDataTieringHotDataAge(configuration)); + } + // DataTieringType.NONE or other types are considered hot by default + return true; + } + /** * Determines whether the data in the HFile being read is considered hot based on the configured * data tiering type and hot data age. If the data tiering type is set to @@ -231,10 +273,12 @@ public class DataTieringManager { } private HRegion getHRegion(Path hFilePath) throws DataTieringException { - if (hFilePath.getParent() == null || hFilePath.getParent().getParent() == null) { - throw new DataTieringException("Incorrect HFile Path: " + hFilePath); + String regionId; + try { + regionId = HRegionFileSystem.getRegionId(hFilePath); + } catch (IOException e) { + throw new DataTieringException(e.getMessage()); } - String regionId = hFilePath.getParent().getParent().getName(); HRegion hRegion = this.onlineRegions.get(regionId); if (hRegion == null) { throw new DataTieringException("HRegion corresponding to " + hFilePath + " doesn't exist"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 6fccccfc820..88b612e4ac5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -1067,6 +1067,31 @@ public class HRegionFileSystem { } } + /** + * Retrieves the Region ID from the given HFile path. + * @param hFilePath The path of the HFile. + * @return The Region ID extracted from the HFile path. + * @throws IOException If an I/O error occurs or if the HFile path is incorrect. + */ + public static String getRegionId(Path hFilePath) throws IOException { + if (hFilePath.getParent() == null || hFilePath.getParent().getParent() == null) { + throw new IOException("Incorrect HFile Path: " + hFilePath); + } + Path dir = hFilePath.getParent().getParent(); + if (isTemporaryDirectoryName(dir.getName())) { + if (dir.getParent() == null) { + throw new IOException("Incorrect HFile Path: " + hFilePath); + } + return dir.getParent().getName(); + } + return dir.getName(); + } + + private static boolean isTemporaryDirectoryName(String dirName) { + return REGION_MERGES_DIR.equals(dirName) || REGION_SPLITS_DIR.equals(dirName) + || REGION_TEMP_DIR.equals(dirName); + } + /** * Creates a directory. Assumes the user has already checked for this directory existence. * @return the result of fs.mkdirs(). In case underlying fs throws an IOException, it checks diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java index 67fa2244e95..2f0fd4cfe54 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java @@ -22,13 +22,11 @@ import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT; -import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; import static org.apache.hadoop.hbase.regionserver.HStoreFile.HISTORICAL_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS; -import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; import static org.apache.hadoop.hbase.regionserver.StoreEngine.STORE_ENGINE_CLASS_KEY; import java.io.IOException; @@ -51,7 +49,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; @@ -495,11 +492,9 @@ public class StoreFileWriter implements CellSink, ShipperListener { private final BloomFilterWriter deleteFamilyBloomFilterWriter; private final BloomType bloomType; private byte[] bloomParam = null; - private long earliestPutTs = HConstants.LATEST_TIMESTAMP; private long deleteFamilyCnt = 0; private BloomContext bloomContext = null; private BloomContext deleteFamilyBloomContext = null; - private final TimeRangeTracker timeRangeTracker; private final Supplier<Collection<HStoreFile>> compactedFilesSupplier; private HFile.Writer writer; @@ -523,7 +518,6 @@ public class StoreFileWriter implements CellSink, ShipperListener { HFileContext fileContext, boolean shouldDropCacheBehind, Supplier<Collection<HStoreFile>> compactedFilesSupplier) throws IOException { this.compactedFilesSupplier = compactedFilesSupplier; - this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); // TODO : Change all writers to be specifically created for compaction context writer = HFile.getWriterFactory(conf, cacheConf).withPath(fs, path).withFavoredNodes(favoredNodes) @@ -665,21 +659,7 @@ public class StoreFileWriter implements CellSink, ShipperListener { * Add TimestampRange and earliest put timestamp to Metadata */ private void appendTrackedTimestampsToMetadata() throws IOException { - // TODO: The StoreFileReader always converts the byte[] to TimeRange - // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. - appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker)); - appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); - } - - /** - * Record the earlest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker - * to include the timestamp of this key - */ - private void trackTimestamps(final Cell cell) { - if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) { - earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); - } - timeRangeTracker.includeTimestamp(cell); + writer.appendTrackedTimestampsToMetadata(); } private void appendGeneralBloomfilter(final Cell cell) throws IOException { @@ -710,7 +690,6 @@ public class StoreFileWriter implements CellSink, ShipperListener { appendGeneralBloomfilter(cell); appendDeleteFamilyBloomFilter(cell); writer.append(cell); - trackTimestamps(cell); } private void beforeShipped() throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java index 51807658f2a..af15ebcc126 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java @@ -53,8 +53,8 @@ public abstract class TimeRangeTracker { SYNC } - static final long INITIAL_MIN_TIMESTAMP = Long.MAX_VALUE; - static final long INITIAL_MAX_TIMESTAMP = -1L; + public static final long INITIAL_MIN_TIMESTAMP = Long.MAX_VALUE; + public static final long INITIAL_MAX_TIMESTAMP = -1L; public static TimeRangeTracker create(Type type) { switch (type) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java index f999a73c473..fbd88a6f58b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java @@ -32,6 +32,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -97,6 +98,7 @@ public class TestDataTieringManager { private static final Logger LOG = LoggerFactory.getLogger(TestDataTieringManager.class); private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final long DAY = 24 * 60 * 60 * 1000; private static Configuration defaultConf; private static FileSystem fs; private static BlockCache blockCache; @@ -107,20 +109,27 @@ public class TestDataTieringManager { private static DataTieringManager dataTieringManager; private static final List<HStoreFile> hStoreFiles = new ArrayList<>(); + /** + * Represents the current lexicographically increasing string used as a row key when writing + * HFiles. It is incremented each time {@link #nextString()} is called to generate unique row + * keys. + */ + private static String rowKeyString; + @BeforeClass public static void setupBeforeClass() throws Exception { testDir = TEST_UTIL.getDataTestDir(TestDataTieringManager.class.getSimpleName()); defaultConf = TEST_UTIL.getConfiguration(); - defaultConf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); - defaultConf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); - defaultConf.setLong(BUCKET_CACHE_SIZE_KEY, 32); - defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, true); - fs = HFileSystem.get(defaultConf); - blockCache = BlockCacheFactory.createBlockCache(defaultConf); - cacheConf = new CacheConfig(defaultConf, blockCache); + updateCommonConfigurations(); assertTrue(DataTieringManager.instantiate(defaultConf, testOnlineRegions)); - setupOnlineRegions(); dataTieringManager = DataTieringManager.getInstance(); + rowKeyString = ""; + } + + private static void updateCommonConfigurations() { + defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, true); + defaultConf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); + defaultConf.setLong(BUCKET_CACHE_SIZE_KEY, 32); } @FunctionalInterface @@ -134,7 +143,8 @@ public class TestDataTieringManager { } @Test - public void testDataTieringEnabledWithKey() { + public void testDataTieringEnabledWithKey() throws IOException { + initializeTestEnvironment(); DataTieringMethodCallerWithKey methodCallerWithKey = DataTieringManager::isDataTieringEnabled; // Test with valid key @@ -152,7 +162,8 @@ public class TestDataTieringManager { } @Test - public void testDataTieringEnabledWithPath() { + public void testDataTieringEnabledWithPath() throws IOException { + initializeTestEnvironment(); DataTieringMethodCallerWithPath methodCallerWithPath = DataTieringManager::isDataTieringEnabled; // Test with valid path @@ -182,7 +193,8 @@ public class TestDataTieringManager { } @Test - public void testHotDataWithKey() { + public void testHotDataWithKey() throws IOException { + initializeTestEnvironment(); DataTieringMethodCallerWithKey methodCallerWithKey = DataTieringManager::isHotData; // Test with valid key @@ -195,7 +207,8 @@ public class TestDataTieringManager { } @Test - public void testHotDataWithPath() { + public void testHotDataWithPath() throws IOException { + initializeTestEnvironment(); DataTieringMethodCallerWithPath methodCallerWithPath = DataTieringManager::isHotData; // Test with valid path @@ -213,6 +226,8 @@ public class TestDataTieringManager { @Test public void testPrefetchWhenDataTieringEnabled() throws IOException { + setPrefetchBlocksOnOpen(); + initializeTestEnvironment(); // Evict blocks from cache by closing the files and passing evict on close. // Then initialize the reader again. Since Prefetch on open is set to true, it should prefetch // those blocks. @@ -224,12 +239,17 @@ public class TestDataTieringManager { // Since we have one cold file among four files, only three should get prefetched. Optional<Map<String, Pair<String, Long>>> fullyCachedFiles = blockCache.getFullyCachedFiles(); assertTrue("We should get the fully cached files from the cache", fullyCachedFiles.isPresent()); - Waiter.waitFor(defaultConf, 60000, () -> fullyCachedFiles.get().size() == 3); + Waiter.waitFor(defaultConf, 10000, () -> fullyCachedFiles.get().size() == 3); assertEquals("Number of fully cached files are incorrect", 3, fullyCachedFiles.get().size()); } + private void setPrefetchBlocksOnOpen() { + defaultConf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); + } + @Test - public void testColdDataFiles() { + public void testColdDataFiles() throws IOException { + initializeTestEnvironment(); Set<BlockCacheKey> allCachedBlocks = new HashSet<>(); for (HStoreFile file : hStoreFiles) { allCachedBlocks.add(new BlockCacheKey(file.getPath(), 0, true, BlockType.DATA)); @@ -255,7 +275,71 @@ public class TestDataTieringManager { } @Test - public void testPickColdDataFiles() { + public void testCacheCompactedBlocksOnWriteDataTieringDisabled() throws IOException { + setCacheCompactBlocksOnWrite(); + initializeTestEnvironment(); + + HRegion region = createHRegion("table3"); + testCacheCompactedBlocksOnWrite(region, true); + } + + @Test + public void testCacheCompactedBlocksOnWriteWithHotData() throws IOException { + setCacheCompactBlocksOnWrite(); + initializeTestEnvironment(); + + HRegion region = createHRegion("table3", getConfWithTimeRangeDataTieringEnabled(5 * DAY)); + testCacheCompactedBlocksOnWrite(region, true); + } + + @Test + public void testCacheCompactedBlocksOnWriteWithColdData() throws IOException { + setCacheCompactBlocksOnWrite(); + initializeTestEnvironment(); + + HRegion region = createHRegion("table3", getConfWithTimeRangeDataTieringEnabled(DAY)); + testCacheCompactedBlocksOnWrite(region, false); + } + + private void setCacheCompactBlocksOnWrite() { + defaultConf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, true); + } + + private void testCacheCompactedBlocksOnWrite(HRegion region, boolean expectDataBlocksCached) + throws IOException { + HStore hStore = createHStore(region, "cf1"); + createTestFilesForCompaction(hStore); + hStore.refreshStoreFiles(); + + region.stores.put(Bytes.toBytes("cf1"), hStore); + testOnlineRegions.put(region.getRegionInfo().getEncodedName(), region); + + long initialStoreFilesCount = hStore.getStorefilesCount(); + long initialCacheDataBlockCount = blockCache.getDataBlockCount(); + assertEquals(3, initialStoreFilesCount); + assertEquals(0, initialCacheDataBlockCount); + + region.compact(true); + + long compactedStoreFilesCount = hStore.getStorefilesCount(); + long compactedCacheDataBlockCount = blockCache.getDataBlockCount(); + assertEquals(1, compactedStoreFilesCount); + assertEquals(expectDataBlocksCached, compactedCacheDataBlockCount > 0); + } + + private void createTestFilesForCompaction(HStore hStore) throws IOException { + long currentTime = System.currentTimeMillis(); + Path storeDir = hStore.getStoreContext().getFamilyStoreDirectoryPath(); + Configuration configuration = hStore.getReadOnlyConfiguration(); + + createHStoreFile(storeDir, configuration, currentTime - 2 * DAY); + createHStoreFile(storeDir, configuration, currentTime - 3 * DAY); + createHStoreFile(storeDir, configuration, currentTime - 4 * DAY); + } + + @Test + public void testPickColdDataFiles() throws IOException { + initializeTestEnvironment(); Map<String, String> coldDataFiles = dataTieringManager.getColdFilesList(); assertEquals(1, coldDataFiles.size()); // hStoreFiles[3] is the cold file. @@ -268,6 +352,7 @@ public class TestDataTieringManager { */ @Test public void testBlockEvictions() throws Exception { + initializeTestEnvironment(); long capacitySize = 40 * 1024; int writeThreads = 3; int writerQLen = 64; @@ -318,6 +403,7 @@ public class TestDataTieringManager { */ @Test public void testBlockEvictionsAllColdBlocks() throws Exception { + initializeTestEnvironment(); long capacitySize = 40 * 1024; int writeThreads = 3; int writerQLen = 64; @@ -365,6 +451,7 @@ public class TestDataTieringManager { */ @Test public void testBlockEvictionsHotBlocks() throws Exception { + initializeTestEnvironment(); long capacitySize = 40 * 1024; int writeThreads = 3; int writerQLen = 64; @@ -412,6 +499,8 @@ public class TestDataTieringManager { public void testFeatureKeyDisabled() throws Exception { DataTieringManager.resetForTestingOnly(); defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, false); + initializeTestEnvironment(); + try { assertFalse(DataTieringManager.instantiate(defaultConf, testOnlineRegions)); // Verify that the DataaTieringManager instance is not instantiated in the @@ -544,7 +633,20 @@ public class TestDataTieringManager { testDataTieringMethodWithKey(caller, key, expectedResult, null); } + private static void initializeTestEnvironment() throws IOException { + setupFileSystemAndCache(); + setupOnlineRegions(); + } + + private static void setupFileSystemAndCache() throws IOException { + fs = HFileSystem.get(defaultConf); + blockCache = BlockCacheFactory.createBlockCache(defaultConf); + cacheConf = new CacheConfig(defaultConf, blockCache); + } + private static void setupOnlineRegions() throws IOException { + testOnlineRegions.clear(); + hStoreFiles.clear(); long day = 24 * 60 * 60 * 1000; long currentTime = System.currentTimeMillis(); @@ -604,7 +706,12 @@ public class TestDataTieringManager { HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs, CommonFSUtils.getTableDir(testDir, hri.getTable()), hri); - return new HRegion(regionFs, null, conf, htd, null); + HRegion region = new HRegion(regionFs, null, conf, htd, null); + // Manually sets the BlockCache for the HRegion instance. + // This is necessary because the region server is not started within this method, + // and therefore the BlockCache needs to be explicitly configured. + region.setBlockCache(blockCache); + return region; } private static HStore createHStore(HRegion region, String columnFamily) throws IOException { @@ -637,24 +744,52 @@ public class TestDataTieringManager { StoreFileWriter storeFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs) .withOutputDir(storeDir).withFileContext(new HFileContextBuilder().build()).build(); - writeStoreFileRandomData(storeFileWriter, Bytes.toBytes(columnFamily), Bytes.toBytes("random"), - timestamp); + writeStoreFileRandomData(storeFileWriter, Bytes.toBytes(columnFamily), timestamp); return new HStoreFile(fs, storeFileWriter.getPath(), conf, cacheConf, BloomType.NONE, true); } + /** + * Writes random data to a store file with rows arranged in lexicographically increasing order. + * Each row is generated using the {@link #nextString()} method, ensuring that each subsequent row + * is lexicographically larger than the previous one. + */ private static void writeStoreFileRandomData(final StoreFileWriter writer, byte[] columnFamily, - byte[] qualifier, long timestamp) throws IOException { + long timestamp) throws IOException { + int cellsPerFile = 10; + byte[] qualifier = Bytes.toBytes("qualifier"); + byte[] value = generateRandomBytes(4 * 1024); try { - for (char d = 'a'; d <= 'z'; d++) { - for (char e = 'a'; e <= 'z'; e++) { - byte[] b = new byte[] { (byte) d, (byte) e }; - writer.append(new KeyValue(b, columnFamily, qualifier, timestamp, b)); - } + for (int i = 0; i < cellsPerFile; i++) { + byte[] row = Bytes.toBytes(nextString()); + writer.append(new KeyValue(row, columnFamily, qualifier, timestamp, value)); } } finally { writer.appendTrackedTimestampsToMetadata(); writer.close(); } } + + private static byte[] generateRandomBytes(int sizeInBytes) { + Random random = new Random(); + byte[] randomBytes = new byte[sizeInBytes]; + random.nextBytes(randomBytes); + return randomBytes; + } + + /** + * Returns the lexicographically larger string every time it's called. + */ + private static String nextString() { + if (rowKeyString == null || rowKeyString.isEmpty()) { + rowKeyString = "a"; + } + char lastChar = rowKeyString.charAt(rowKeyString.length() - 1); + if (lastChar < 'z') { + rowKeyString = rowKeyString.substring(0, rowKeyString.length() - 1) + (char) (lastChar + 1); + } else { + rowKeyString = rowKeyString + "a"; + } + return rowKeyString; + } }