This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch HBASE-28463
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-28463 by this push:
new efa1ba3d498 HBASE-29427 Merge all commits related to custom tiering
into the feature branch (#7124)
efa1ba3d498 is described below
commit efa1ba3d49858f465f4ffaf006bfc9f5ca3f8938
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Tue Jul 15 11:48:35 2025 +0100
HBASE-29427 Merge all commits related to custom tiering into the feature
branch (#7124)
This is the whole custom tiering implementation and involves the following
individual works:
* HBASE-29412 Extend date tiered compaction to allow for tiering by values
other than cell timestamp
* HBASE-29413 Implement a custom qualifier tiered compaction
* HBASE-29414 Refactor DataTieringManager to make priority logic pluggable
* HBASE-29422 Implement selectMinorCompation in
CustomCellDateTieredCompactionPolicy
* HBASE-29424 Implement configuration validation for custom tiering
compactions
* HBASE-29425 Refine and polish code
* HBASE-29426 Define a tiering value provider and refactor custom tiered
compaction related classes
* HBASE-28463 Rebase time based priority branch (HBASE-28463) with latest
master (and fix conflicts)
Co-authored-by: Janardhan Hungund <[email protected]>
Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
---
.../main/java/org/apache/hadoop/hbase/TagType.java | 2 +
.../apache/hadoop/hbase/io/hfile/BlockCache.java | 9 +-
.../apache/hadoop/hbase/io/hfile/CacheConfig.java | 3 +-
.../hadoop/hbase/io/hfile/CombinedBlockCache.java | 7 +-
.../org/apache/hadoop/hbase/io/hfile/HFile.java | 7 +
.../hadoop/hbase/io/hfile/HFileReaderImpl.java | 2 +-
.../hadoop/hbase/io/hfile/HFileWriterImpl.java | 22 +-
.../hadoop/hbase/io/hfile/bucket/BucketCache.java | 10 +-
.../master/procedure/CreateTableProcedure.java | 3 +
.../master/procedure/ModifyTableProcedure.java | 2 +
.../hadoop/hbase/regionserver/CellTSTiering.java | 57 +++++
.../regionserver/CustomTieredStoreEngine.java | 56 +++++
.../hadoop/hbase/regionserver/CustomTiering.java | 58 +++++
.../regionserver/CustomTieringMultiFileWriter.java | 85 +++++++
.../{DataTieringType.java => DataTiering.java} | 11 +-
.../hbase/regionserver/DataTieringManager.java | 98 ++------
.../hadoop/hbase/regionserver/DataTieringType.java | 15 +-
.../regionserver/DateTieredMultiFileWriter.java | 20 +-
.../hbase/regionserver/DateTieredStoreEngine.java | 17 +-
.../hadoop/hbase/regionserver/StoreFileWriter.java | 13 +
.../hbase/regionserver/compactions/Compactor.java | 6 +
.../compactions/CustomCellTieredUtils.java | 49 ++++
.../CustomCellTieringValueProvider.java | 87 +++++++
.../CustomDateTieredCompactionPolicy.java | 155 ++++++++++++
.../compactions/CustomTieredCompactor.java | 74 ++++++
.../compactions/DateTieredCompactionPolicy.java | 129 ++++++----
.../compactions/DateTieredCompactor.java | 12 +-
.../TestHFileInlineToRootChunkConversion.java | 3 +-
....java => TestCustomCellDataTieringManager.java} | 163 ++++++------
.../TestCustomCellTieredCompactionPolicy.java | 275 +++++++++++++++++++++
.../hbase/regionserver/TestDataTieringManager.java | 19 +-
.../compactions/TestCustomCellTieredCompactor.java | 148 +++++++++++
32 files changed, 1372 insertions(+), 245 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index eb9a7f3eccc..b0df4920e4e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -36,4 +36,6 @@ public final class TagType {
// String based tag type used in replication
public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
public static final byte TTL_TAG_TYPE = (byte) 8;
+ // tag with the custom cell tiering value for the row
+ public static final byte CELL_VALUE_TIERING_TAG_TYPE = (byte) 9;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
index cef5a6488fa..9297e7074a9 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
@@ -23,7 +23,6 @@ import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
-import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
@@ -214,13 +213,13 @@ public interface BlockCache extends
Iterable<CachedBlock>, ConfigurationObserver
* not be overridden by all implementing classes. In such cases, the
returned Optional will be
* empty. For subclasses implementing this logic, the returned Optional
would contain the boolean
* value reflecting if the passed block should indeed be cached.
- * @param key The key representing the block to check if it
should be cached.
- * @param timeRangeTracker the time range tracker containing the timestamps
- * @param conf The configuration object to use for determining
caching behavior.
+ * @param key The key representing the block to check if it should
be cached.
+ * @param maxTimeStamp The maximum timestamp for the block to check if it
should be cached.
+ * @param conf The configuration object to use for determining
caching behavior.
* @return An empty Optional if this method is not supported; otherwise, the
returned Optional
* contains the boolean value indicating if the block should be
cached.
*/
- default Optional<Boolean> shouldCacheBlock(BlockCacheKey key,
TimeRangeTracker timeRangeTracker,
+ default Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long
maxTimeStamp,
Configuration conf) {
return Optional.empty();
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 3fcf75b3970..72ca37c0557 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -282,7 +282,8 @@ public class CacheConfig implements
PropagatingConfigurationObserver {
public boolean shouldCacheBlockOnRead(BlockCategory category, HFileInfo
hFileInfo,
Configuration conf) {
Optional<Boolean> cacheFileBlock = Optional.of(true);
- if (getBlockCache().isPresent()) {
+ // For DATA blocks only, if BuckeCache is in use, we don't need to cache
block again
+ if (getBlockCache().isPresent() && category.equals(BlockCategory.DATA)) {
Optional<Boolean> result =
getBlockCache().get().shouldCacheFile(hFileInfo, conf);
if (result.isPresent()) {
cacheFileBlock = result;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
index 672a7bc1e72..e5d52858ab6 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
-import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -494,10 +493,10 @@ public class CombinedBlockCache implements
ResizableBlockCache, HeapSize {
}
@Override
- public Optional<Boolean> shouldCacheBlock(BlockCacheKey key,
TimeRangeTracker timeRangeTracker,
+ public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long
maxTimeStamp,
Configuration conf) {
- return combineCacheResults(l1Cache.shouldCacheBlock(key, timeRangeTracker,
conf),
- l2Cache.shouldCacheBlock(key, timeRangeTracker, conf));
+ return combineCacheResults(l1Cache.shouldCacheBlock(key, maxTimeStamp,
conf),
+ l2Cache.shouldCacheBlock(key, maxTimeStamp, conf));
}
private Optional<Boolean> combineCacheResults(Optional<Boolean> result1,
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index d8dffce59e8..a99eac4085e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -43,6 +43,7 @@ import
org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -217,6 +218,12 @@ public final class HFile {
*/
void appendTrackedTimestampsToMetadata() throws IOException;
+ /**
+ * Add Custom cell timestamp to Metadata
+ */
+ public void appendCustomCellTimestampsToMetadata(TimeRangeTracker
timeRangeTracker)
+ throws IOException;
+
/** Returns the path to this {@link HFile} */
Path getPath();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 4b60ef662c2..972e8070e1c 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1380,7 +1380,7 @@ public abstract class HFileReaderImpl implements
HFile.Reader, Configurable {
HFileBlock unpackedNoChecksum =
BlockCacheUtil.getBlockForCaching(cacheConf, unpacked);
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
- if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
+ if (cacheBlock && cacheOnRead) {
// Using the wait on cache during compaction and prefetching.
cache.cacheBlock(cacheKey,
cacheCompressed
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
index 96bfe42f1fd..684aee3beac 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.io.hfile;
import static
org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED;
+import static
org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
@@ -29,6 +30,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -127,6 +129,12 @@ public class HFileWriterImpl implements HFile.Writer {
/** Cache configuration for caching data on write. */
protected final CacheConfig cacheConf;
+ public void setTimeRangeTrackerForTiering(Supplier<TimeRangeTracker>
timeRangeTrackerForTiering) {
+ this.timeRangeTrackerForTiering = timeRangeTrackerForTiering;
+ }
+
+ private Supplier<TimeRangeTracker> timeRangeTrackerForTiering;
+
/**
* Name for this object used when logging or in toString. Is either the
result of a toString on
* stream or else name of passed file Path.
@@ -186,7 +194,9 @@ public class HFileWriterImpl implements HFile.Writer {
this.path = path;
this.name = path != null ? path.getName() : outputStream.toString();
this.hFileContext = fileContext;
+ // TODO: Move this back to upper layer
this.timeRangeTracker =
TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
+ this.timeRangeTrackerForTiering = () -> this.timeRangeTracker;
DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
if (encoding != DataBlockEncoding.NONE) {
this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
@@ -588,7 +598,8 @@ public class HFileWriterImpl implements HFile.Writer {
}
private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) {
- Optional<Boolean> result = cache.shouldCacheBlock(key, timeRangeTracker,
conf);
+ Optional<Boolean> result =
+ cache.shouldCacheBlock(key, timeRangeTrackerForTiering.get().getMax(),
conf);
return result.orElse(true);
}
@@ -899,12 +910,19 @@ public class HFileWriterImpl implements HFile.Writer {
appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
}
+ public void appendCustomCellTimestampsToMetadata(TimeRangeTracker
timeRangeTracker)
+ throws IOException {
+ // TODO: The StoreFileReader always converts the byte[] to TimeRange
+ // via TimeRangeTracker, so we should write the serialization data of
TimeRange directly.
+ appendFileInfo(CUSTOM_TIERING_TIME_RANGE,
TimeRangeTracker.toByteArray(timeRangeTracker));
+ }
+
/**
* Record the earliest Put timestamp. If the timeRangeTracker is not set,
update TimeRangeTracker
* to include the timestamp of this key
*/
private void trackTimestamps(final ExtendedCell cell) {
- if (Cell.Type.Put == cell.getType()) {
+ if (KeyValue.Type.Put == KeyValue.Type.codeToType(cell.getTypeByte())) {
earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
}
timeRangeTracker.includeTimestamp(cell);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 2af46a49d89..5867fff0861 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -86,7 +86,6 @@ import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.regionserver.DataTieringManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
@@ -1195,8 +1194,9 @@ public class BucketCache implements BlockCache, HeapSize {
}
}
- if (bytesFreed < bytesToFreeWithExtra &&
- coldFiles != null &&
coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName())
+ if (
+ bytesFreed < bytesToFreeWithExtra && coldFiles != null
+ &&
coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName())
) {
int freedBlockSize = bucketEntryWithKey.getValue().getLength();
if (evictBlockIfNoRpcReferenced(bucketEntryWithKey.getKey())) {
@@ -2458,10 +2458,10 @@ public class BucketCache implements BlockCache,
HeapSize {
}
@Override
- public Optional<Boolean> shouldCacheBlock(BlockCacheKey key,
TimeRangeTracker timeRangeTracker,
+ public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long
maxTimestamp,
Configuration conf) {
DataTieringManager dataTieringManager = DataTieringManager.getInstance();
- if (dataTieringManager != null &&
!dataTieringManager.isHotData(timeRangeTracker, conf)) {
+ if (dataTieringManager != null &&
!dataTieringManager.isHotData(maxTimestamp, conf)) {
LOG.debug("Data tiering is enabled for file: '{}' and it is not hot
data",
key.getHfileName());
return Optional.of(false);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index 23ad3b42aef..423297f667d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieredUtils;
import
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
@@ -314,6 +315,8 @@ public class CreateTableProcedure extends
AbstractStateMachineTableProcedure<Cre
StoreFileTrackerValidationUtils.checkForCreateTable(env.getMasterConfiguration(),
tableDescriptor);
+ CustomCellTieredUtils.checkForModifyTable(tableDescriptor);
+
return true;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index 03ad19799cd..95896838dc2 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.fs.ErasureCodingUtils;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieredUtils;
import
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.util.Bytes;
@@ -420,6 +421,7 @@ public class ModifyTableProcedure extends
AbstractStateMachineTableProcedure<Mod
// check for store file tracker configurations
StoreFileTrackerValidationUtils.checkForModifyTable(env.getMasterConfiguration(),
unmodifiedTableDescriptor, modifiedTableDescriptor,
!isTableEnabled(env));
+ CustomCellTieredUtils.checkForModifyTable(modifiedTableDescriptor);
}
/**
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellTSTiering.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellTSTiering.java
new file mode 100644
index 00000000000..ed7dc01ba8d
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellTSTiering.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
+
+import java.io.IOException;
+import java.util.OptionalLong;
+import org.apache.hadoop.hbase.io.hfile.HFileInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected]
+public class CellTSTiering implements DataTiering {
+ private static final Logger LOG =
LoggerFactory.getLogger(CellTSTiering.class);
+
+ public long getTimestamp(HStoreFile hStoreFile) {
+ OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
+ if (!maxTimestamp.isPresent()) {
+ LOG.debug("Maximum timestamp not present for {}", hStoreFile.getPath());
+ return Long.MAX_VALUE;
+ }
+ return maxTimestamp.getAsLong();
+ }
+
+ public long getTimestamp(HFileInfo hFileInfo) {
+ try {
+ byte[] hFileTimeRange = hFileInfo.get(TIMERANGE_KEY);
+ if (hFileTimeRange == null) {
+ LOG.debug("Timestamp information not found for file: {}",
+ hFileInfo.getHFileContext().getHFileName());
+ return Long.MAX_VALUE;
+ }
+ return TimeRangeTracker.parseFrom(hFileTimeRange).getMax();
+ } catch (IOException e) {
+ LOG.error("Error occurred while reading the timestamp metadata of file:
{}",
+ hFileInfo.getHFileContext().getHFileName(), e);
+ return Long.MAX_VALUE;
+ }
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieredStoreEngine.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieredStoreEngine.java
new file mode 100644
index 00000000000..518b31fb5be
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieredStoreEngine.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static
org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CompoundConfiguration;
+import
org.apache.hadoop.hbase.regionserver.compactions.CustomDateTieredCompactionPolicy;
+import org.apache.hadoop.hbase.regionserver.compactions.CustomTieredCompactor;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Extension of {@link DateTieredStoreEngine} that uses a pluggable value
provider for extracting
+ * the value to be used for comparison in this tiered compaction. Differently
from the existing Date
+ * Tiered Compaction, this doesn't yield multiple tiers or files, but rather
provides two tiers
+ * based on a configurable “cut-off” age. All rows with the cell tiering value
older than this
+ * “cut-off” age would be placed together in an “old” tier, whilst younger
rows would go to a
+ * separate, “young” tier file.
+ */
[email protected]
+public class CustomTieredStoreEngine extends DateTieredStoreEngine {
+
+ @Override
+ protected void createComponents(Configuration conf, HStore store,
CellComparator kvComparator)
+ throws IOException {
+ CompoundConfiguration config = new CompoundConfiguration();
+ config.add(conf);
+ config.add(store.conf);
+ config.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
+ CustomDateTieredCompactionPolicy.class.getName());
+ createCompactionPolicy(config, store);
+ this.storeFileManager = new DefaultStoreFileManager(kvComparator,
+ StoreFileComparators.SEQ_ID_MAX_TIMESTAMP, config,
compactionPolicy.getConf());
+ this.storeFlusher = new DefaultStoreFlusher(config, store);
+ this.compactor = new CustomTieredCompactor(config, store);
+ }
+
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTiering.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTiering.java
new file mode 100644
index 00000000000..7a9914c87d3
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTiering.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static
org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
+
+import java.io.IOException;
+import java.util.Date;
+import org.apache.hadoop.hbase.io.hfile.HFileInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected]
+public class CustomTiering implements DataTiering {
+ private static final Logger LOG =
LoggerFactory.getLogger(CustomTiering.class);
+
+ private long getMaxTSFromTimeRange(byte[] hFileTimeRange, String hFileName) {
+ try {
+ if (hFileTimeRange == null) {
+ LOG.debug("Custom cell-based timestamp information not found for file:
{}", hFileName);
+ return Long.MAX_VALUE;
+ }
+ long parsedValue = TimeRangeTracker.parseFrom(hFileTimeRange).getMax();
+ LOG.debug("Max TS for file {} is {}", hFileName, new Date(parsedValue));
+ return parsedValue;
+ } catch (IOException e) {
+ LOG.error("Error occurred while reading the Custom cell-based timestamp
metadata of file: {}",
+ hFileName, e);
+ return Long.MAX_VALUE;
+ }
+ }
+
+ public long getTimestamp(HStoreFile hStoreFile) {
+ return
getMaxTSFromTimeRange(hStoreFile.getMetadataValue(CUSTOM_TIERING_TIME_RANGE),
+ hStoreFile.getPath().getName());
+ }
+
+ public long getTimestamp(HFileInfo hFileInfo) {
+ return getMaxTSFromTimeRange(hFileInfo.get(CUSTOM_TIERING_TIME_RANGE),
+ hFileInfo.getHFileContext().getHFileName());
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java
new file mode 100644
index 00000000000..d2b88a501ec
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.function.Function;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public class CustomTieringMultiFileWriter extends DateTieredMultiFileWriter {
+
+ public static final byte[] CUSTOM_TIERING_TIME_RANGE =
Bytes.toBytes("CUSTOM_TIERING_TIME_RANGE");
+
+ private NavigableMap<Long, TimeRangeTracker> lowerBoundary2TimeRanger = new
TreeMap<>();
+
+ public CustomTieringMultiFileWriter(List<Long> lowerBoundaries,
+ Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile,
+ Function<ExtendedCell, Long> tieringFunction) {
+ super(lowerBoundaries, lowerBoundariesPolicies, needEmptyFile,
tieringFunction);
+ for (Long lowerBoundary : lowerBoundaries) {
+ lowerBoundary2TimeRanger.put(lowerBoundary, null);
+ }
+ }
+
+ @Override
+ public void append(ExtendedCell cell) throws IOException {
+ super.append(cell);
+ long tieringValue = tieringFunction.apply(cell);
+ Map.Entry<Long, TimeRangeTracker> entry =
lowerBoundary2TimeRanger.floorEntry(tieringValue);
+ if (entry.getValue() == null) {
+ TimeRangeTracker timeRangeTracker =
TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
+ timeRangeTracker.setMin(tieringValue);
+ timeRangeTracker.setMax(tieringValue);
+ lowerBoundary2TimeRanger.put(entry.getKey(), timeRangeTracker);
+ ((HFileWriterImpl)
lowerBoundary2Writer.get(entry.getKey()).getLiveFileWriter())
+ .setTimeRangeTrackerForTiering(() -> timeRangeTracker);
+ } else {
+ TimeRangeTracker timeRangeTracker = entry.getValue();
+ if (timeRangeTracker.getMin() > tieringValue) {
+ timeRangeTracker.setMin(tieringValue);
+ }
+ if (timeRangeTracker.getMax() < tieringValue) {
+ timeRangeTracker.setMax(tieringValue);
+ }
+ }
+ }
+
+ @Override
+ public List<Path> commitWriters(long maxSeqId, boolean majorCompaction,
+ Collection<HStoreFile> storeFiles) throws IOException {
+ for (Map.Entry<Long, StoreFileWriter> entry :
this.lowerBoundary2Writer.entrySet()) {
+ StoreFileWriter writer = entry.getValue();
+ if (writer != null) {
+ writer.appendFileInfo(CUSTOM_TIERING_TIME_RANGE,
+
TimeRangeTracker.toByteArray(lowerBoundary2TimeRanger.get(entry.getKey())));
+ }
+ }
+ return super.commitWriters(maxSeqId, majorCompaction, storeFiles);
+ }
+
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTiering.java
similarity index 82%
copy from
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java
copy to
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTiering.java
index ee54576a648..51e89b0b79d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTiering.java
@@ -17,10 +17,13 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.yetus.audience.InterfaceAudience;
[email protected]
-public enum DataTieringType {
- NONE,
- TIME_RANGE
[email protected]
+public interface DataTiering {
+ long getTimestamp(HStoreFile hFile);
+
+ long getTimestamp(HFileInfo hFileInfo);
+
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java
index aa56e3f6444..2a5e2a5aa39 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java
@@ -17,13 +17,11 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
-
import java.io.IOException;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.OptionalLong;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -136,17 +134,18 @@ public class DataTieringManager {
* the data tiering type is set to {@link DataTieringType#TIME_RANGE}, it
uses the maximum
* timestamp from the time range tracker to determine if the data is hot.
Otherwise, it considers
* the data as hot by default.
- * @param timeRangeTracker the time range tracker containing the timestamps
- * @param conf The configuration object to use for determining
hot data criteria.
+ * @param maxTimestamp the maximum timestamp associated with the data.
+ * @param conf The configuration object to use for determining hot
data criteria.
* @return {@code true} if the data is hot, {@code false} otherwise
*/
- public boolean isHotData(TimeRangeTracker timeRangeTracker, Configuration
conf) {
+ public boolean isHotData(long maxTimestamp, Configuration conf) {
DataTieringType dataTieringType = getDataTieringType(conf);
+
if (
- dataTieringType.equals(DataTieringType.TIME_RANGE)
- && timeRangeTracker.getMax() != TimeRangeTracker.INITIAL_MAX_TIMESTAMP
+ !dataTieringType.equals(DataTieringType.NONE)
+ && maxTimestamp != TimeRangeTracker.INITIAL_MAX_TIMESTAMP
) {
- return hotDataValidator(timeRangeTracker.getMax(),
getDataTieringHotDataAge(conf));
+ return hotDataValidator(maxTimestamp, getDataTieringHotDataAge(conf));
}
// DataTieringType.NONE or other types are considered hot by default
return true;
@@ -165,29 +164,14 @@ public class DataTieringManager {
Configuration configuration = getConfiguration(hFilePath);
DataTieringType dataTieringType = getDataTieringType(configuration);
- if (dataTieringType.equals(DataTieringType.TIME_RANGE)) {
- return hotDataValidator(getMaxTimestamp(hFilePath),
getDataTieringHotDataAge(configuration));
- }
- // DataTieringType.NONE or other types are considered hot by default
- return true;
- }
-
- /**
- * Determines whether the data in the HFile at the given path is considered
hot based on the
- * configured data tiering type and hot data age. If the data tiering type
is set to
- * {@link DataTieringType#TIME_RANGE}, it validates the data against the
provided maximum
- * timestamp.
- * @param hFilePath the path to the HFile
- * @param maxTimestamp the maximum timestamp to validate against
- * @return {@code true} if the data is hot, {@code false} otherwise
- * @throws DataTieringException if there is an error retrieving data tiering
information
- */
- public boolean isHotData(Path hFilePath, long maxTimestamp) throws
DataTieringException {
- Configuration configuration = getConfiguration(hFilePath);
- DataTieringType dataTieringType = getDataTieringType(configuration);
-
- if (dataTieringType.equals(DataTieringType.TIME_RANGE)) {
- return hotDataValidator(maxTimestamp,
getDataTieringHotDataAge(configuration));
+ if (!dataTieringType.equals(DataTieringType.NONE)) {
+ HStoreFile hStoreFile = getHStoreFile(hFilePath);
+ if (hStoreFile == null) {
+ throw new DataTieringException(
+ "Store file corresponding to " + hFilePath + " doesn't exist");
+ }
+ return
hotDataValidator(dataTieringType.getInstance().getTimestamp(getHStoreFile(hFilePath)),
+ getDataTieringHotDataAge(configuration));
}
// DataTieringType.NONE or other types are considered hot by default
return true;
@@ -204,8 +188,9 @@ public class DataTieringManager {
*/
public boolean isHotData(HFileInfo hFileInfo, Configuration configuration) {
DataTieringType dataTieringType = getDataTieringType(configuration);
- if (dataTieringType.equals(DataTieringType.TIME_RANGE)) {
- return hotDataValidator(getMaxTimestamp(hFileInfo),
getDataTieringHotDataAge(configuration));
+ if (hFileInfo != null && !dataTieringType.equals(DataTieringType.NONE)) {
+ return
hotDataValidator(dataTieringType.getInstance().getTimestamp(hFileInfo),
+ getDataTieringHotDataAge(configuration));
}
// DataTieringType.NONE or other types are considered hot by default
return true;
@@ -217,36 +202,6 @@ public class DataTieringManager {
return diff <= hotDataAge;
}
- private long getMaxTimestamp(Path hFilePath) throws DataTieringException {
- HStoreFile hStoreFile = getHStoreFile(hFilePath);
- if (hStoreFile == null) {
- LOG.error("HStoreFile corresponding to {} doesn't exist", hFilePath);
- return Long.MAX_VALUE;
- }
- OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
- if (!maxTimestamp.isPresent()) {
- LOG.error("Maximum timestamp not present for {}", hFilePath);
- return Long.MAX_VALUE;
- }
- return maxTimestamp.getAsLong();
- }
-
- private long getMaxTimestamp(HFileInfo hFileInfo) {
- try {
- byte[] hFileTimeRange = hFileInfo.get(TIMERANGE_KEY);
- if (hFileTimeRange == null) {
- LOG.error("Timestamp information not found for file: {}",
- hFileInfo.getHFileContext().getHFileName());
- return Long.MAX_VALUE;
- }
- return TimeRangeTracker.parseFrom(hFileTimeRange).getMax();
- } catch (IOException e) {
- LOG.error("Error occurred while reading the timestamp metadata of file:
{}",
- hFileInfo.getHFileContext().getHFileName(), e);
- return Long.MAX_VALUE;
- }
- }
-
private long getCurrentTimestamp() {
return EnvironmentEdgeManager.getDelegate().currentTime();
}
@@ -299,7 +254,7 @@ public class DataTieringManager {
private HStoreFile getHStoreFile(Path hFilePath) throws DataTieringException
{
HStore hStore = getHStore(hFilePath);
for (HStoreFile file : hStore.getStorefiles()) {
- if (file.getPath().equals(hFilePath)) {
+ if
(file.getPath().toUri().getPath().toString().equals(hFilePath.toString())) {
return file;
}
}
@@ -330,7 +285,8 @@ public class DataTieringManager {
for (HRegion r : this.onlineRegions.values()) {
for (HStore hStore : r.getStores()) {
Configuration conf = hStore.getReadOnlyConfiguration();
- if (getDataTieringType(conf) != DataTieringType.TIME_RANGE) {
+ DataTieringType dataTieringType = getDataTieringType(conf);
+ if (dataTieringType == DataTieringType.NONE) {
// Data-Tiering not enabled for the store. Just skip it.
continue;
}
@@ -339,14 +295,10 @@ public class DataTieringManager {
for (HStoreFile hStoreFile : hStore.getStorefiles()) {
String hFileName =
hStoreFile.getFileInfo().getHFileInfo().getHFileContext().getHFileName();
- OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
- if (!maxTimestamp.isPresent()) {
- LOG.warn("maxTimestamp missing for file: {}",
- hStoreFile.getFileInfo().getActiveFileName());
- continue;
- }
+ long maxTimeStamp =
dataTieringType.getInstance().getTimestamp(hStoreFile);
+ LOG.debug("Max TS for file {} is {}", hFileName, new
Date(maxTimeStamp));
long currentTimestamp =
EnvironmentEdgeManager.getDelegate().currentTime();
- long fileAge = currentTimestamp - maxTimestamp.getAsLong();
+ long fileAge = currentTimestamp - maxTimeStamp;
if (fileAge > hotDataAge) {
// Values do not matter.
coldFiles.put(hFileName, null);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java
index ee54576a648..83da5f54e43 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java
@@ -21,6 +21,17 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Public
public enum DataTieringType {
- NONE,
- TIME_RANGE
+ NONE(null),
+ TIME_RANGE(new CellTSTiering()),
+ CUSTOM(new CustomTiering());
+
+ private final DataTiering instance;
+
+ DataTieringType(DataTiering instance) {
+ this.instance = instance;
+ }
+
+ public DataTiering getInstance() {
+ return instance;
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
index b800178e8a2..e01f062a019 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
+import java.util.function.Function;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.yetus.audience.InterfaceAudience;
@@ -33,12 +34,14 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
- private final NavigableMap<Long, StoreFileWriter> lowerBoundary2Writer = new
TreeMap<>();
+ protected final NavigableMap<Long, StoreFileWriter> lowerBoundary2Writer =
new TreeMap<>();
private final boolean needEmptyFile;
private final Map<Long, String> lowerBoundariesPolicies;
+ protected Function<ExtendedCell, Long> tieringFunction;
+
/**
* @param lowerBoundariesPolicies each window to storage policy map.
* @param needEmptyFile whether need to create an empty store file
if we haven't written
@@ -46,16 +49,29 @@ public class DateTieredMultiFileWriter extends
AbstractMultiFileWriter {
*/
public DateTieredMultiFileWriter(List<Long> lowerBoundaries,
Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile) {
+ this(lowerBoundaries, lowerBoundariesPolicies, needEmptyFile, c ->
c.getTimestamp());
+ }
+
+ /**
+ * @param lowerBoundariesPolicies each window to storage policy map.
+ * @param needEmptyFile whether need to create an empty store file
if we haven't written
+ * out anything.
+ */
+ public DateTieredMultiFileWriter(List<Long> lowerBoundaries,
+ Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile,
+ Function<ExtendedCell, Long> tieringFunction) {
for (Long lowerBoundary : lowerBoundaries) {
lowerBoundary2Writer.put(lowerBoundary, null);
}
this.needEmptyFile = needEmptyFile;
this.lowerBoundariesPolicies = lowerBoundariesPolicies;
+ this.tieringFunction = tieringFunction;
}
@Override
public void append(ExtendedCell cell) throws IOException {
- Map.Entry<Long, StoreFileWriter> entry =
lowerBoundary2Writer.floorEntry(cell.getTimestamp());
+ Map.Entry<Long, StoreFileWriter> entry =
+ lowerBoundary2Writer.floorEntry(tieringFunction.apply(cell));
StoreFileWriter writer = entry.getValue();
if (writer == null) {
String lowerBoundaryStoragePolicy =
lowerBoundariesPolicies.get(entry.getKey());
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
index 26437ab1124..dc13f190afa 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import static
org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
+
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -29,6 +31,7 @@ import
org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequ
import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -44,6 +47,18 @@ public class DateTieredStoreEngine extends
StoreEngine<DefaultStoreFlusher,
public static final String DATE_TIERED_STORE_ENGINE =
DateTieredStoreEngine.class.getName();
+ protected void createCompactionPolicy(Configuration conf, HStore store)
throws IOException {
+ String className =
+ conf.get(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
DateTieredCompactionPolicy.class.getName());
+ try {
+ compactionPolicy = ReflectionUtils.instantiateWithCustomCtor(className,
+ new Class[] { Configuration.class, StoreConfigInformation.class },
+ new Object[] { conf, store });
+ } catch (Exception e) {
+ throw new IOException("Unable to load configured compaction policy '" +
className + "'", e);
+ }
+ }
+
@Override
public boolean needsCompaction(List<HStoreFile> filesCompacting) {
return compactionPolicy.needsCompaction(storeFileManager.getStoreFiles(),
filesCompacting);
@@ -57,7 +72,7 @@ public class DateTieredStoreEngine extends
StoreEngine<DefaultStoreFlusher,
@Override
protected void createComponents(Configuration conf, HStore store,
CellComparator kvComparator)
throws IOException {
- this.compactionPolicy = new DateTieredCompactionPolicy(conf, store);
+ createCompactionPolicy(conf, store);
this.storeFileManager = new DefaultStoreFileManager(kvComparator,
StoreFileComparators.SEQ_ID_MAX_TIMESTAMP, conf,
compactionPolicy.getConf());
this.storeFlusher = new DefaultStoreFlusher(conf, store);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
index 569b9d3faa6..b5732d3b23a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
@@ -255,6 +255,14 @@ public class StoreFileWriter implements CellSink,
ShipperListener {
}
}
+ public void appendCustomCellTimestampsToMetadata(TimeRangeTracker
timeRangeTracker)
+ throws IOException {
+ liveFileWriter.appendCustomCellTimestampsToMetadata(timeRangeTracker);
+ if (historicalFileWriter != null) {
+
historicalFileWriter.appendCustomCellTimestampsToMetadata(timeRangeTracker);
+ }
+ }
+
@Override
public void beforeShipped() throws IOException {
liveFileWriter.beforeShipped();
@@ -663,6 +671,11 @@ public class StoreFileWriter implements CellSink,
ShipperListener {
writer.appendTrackedTimestampsToMetadata();
}
+ public void appendCustomCellTimestampsToMetadata(TimeRangeTracker
timeRangeTracker)
+ throws IOException {
+ writer.appendCustomCellTimestampsToMetadata(timeRangeTracker);
+ }
+
private void appendGeneralBloomfilter(final ExtendedCell cell) throws
IOException {
if (this.generalBloomFilterWriter != null) {
/*
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 055ad85e5a3..069968294b8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -403,6 +403,11 @@ public abstract class Compactor<T extends CellSink> {
protected abstract void abortWriter(T writer) throws IOException;
+ protected List<ExtendedCell> decorateCells(List<ExtendedCell> cells) {
+ // no op
+ return cells;
+ }
+
/**
* Performs the compaction.
* @param fd FileDetails of cell sink writer
@@ -459,6 +464,7 @@ public abstract class Compactor<T extends CellSink> {
// output to writer:
Cell lastCleanCell = null;
long lastCleanCellSeqId = 0;
+ cells = decorateCells(cells);
for (ExtendedCell c : cells) {
if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
lastCleanCell = c;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredUtils.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredUtils.java
new file mode 100644
index 00000000000..f908b31e4ae
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static
org.apache.hadoop.hbase.regionserver.StoreEngine.STORE_ENGINE_CLASS_KEY;
+import static
org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieringValueProvider.TIERING_CELL_QUALIFIER;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public class CustomCellTieredUtils {
+ private CustomCellTieredUtils() {
+ // Utility class, no instantiation
+ }
+
+ public static void checkForModifyTable(TableDescriptor newTable) throws
IOException {
+ for (ColumnFamilyDescriptor descriptor : newTable.getColumnFamilies()) {
+ String storeEngineClassName =
descriptor.getConfigurationValue(STORE_ENGINE_CLASS_KEY);
+ if (
+ storeEngineClassName != null &&
storeEngineClassName.contains("CustomCellTieredStoreEngine")
+ ) {
+ if (descriptor.getConfigurationValue(TIERING_CELL_QUALIFIER) == null) {
+ throw new DoNotRetryIOException("StoreEngine " + storeEngineClassName
+ + " is missing required " + TIERING_CELL_QUALIFIER + "
parameter.");
+ }
+ }
+ }
+ }
+
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieringValueProvider.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieringValueProvider.java
new file mode 100644
index 00000000000..fca76bae8f8
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieringValueProvider.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ArrayBackedTag;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * An extension of DateTieredCompactor, overriding the decorateCells method to
allow for custom
+ * values to be used for the different file tiers during compaction.
+ */
[email protected]
+public class CustomCellTieringValueProvider implements
CustomTieredCompactor.TieringValueProvider {
+ public static final String TIERING_CELL_QUALIFIER = "TIERING_CELL_QUALIFIER";
+ private byte[] tieringQualifier;
+
+ @Override
+ public void init(Configuration conf) throws Exception {
+ tieringQualifier = Bytes.toBytes(conf.get(TIERING_CELL_QUALIFIER));
+ }
+
+ @Override
+ public List<ExtendedCell> decorateCells(List<ExtendedCell> cells) {
+ // if no tiering qualifier properly set, skips the whole flow
+ if (tieringQualifier != null) {
+ byte[] tieringValue = null;
+ // first iterates through the cells within a row, to find the tiering
value for the row
+ for (Cell cell : cells) {
+ if (CellUtil.matchingQualifier(cell, tieringQualifier)) {
+ tieringValue = CellUtil.cloneValue(cell);
+ break;
+ }
+ }
+ if (tieringValue == null) {
+ tieringValue = Bytes.toBytes(Long.MAX_VALUE);
+ }
+ // now apply the tiering value as a tag to all cells within the row
+ Tag tieringValueTag = new
ArrayBackedTag(TagType.CELL_VALUE_TIERING_TAG_TYPE, tieringValue);
+ List<ExtendedCell> newCells = new ArrayList<>(cells.size());
+ for (ExtendedCell cell : cells) {
+ List<Tag> tags = PrivateCellUtil.getTags(cell);
+ tags.add(tieringValueTag);
+ newCells.add(PrivateCellUtil.createCell(cell, tags));
+ }
+ return newCells;
+ } else {
+ return cells;
+ }
+ }
+
+ @Override
+ public long getTieringValue(ExtendedCell cell) {
+ Optional<Tag> tagOptional = PrivateCellUtil.getTag(cell,
TagType.CELL_VALUE_TIERING_TAG_TYPE);
+ if (tagOptional.isPresent()) {
+ Tag tag = tagOptional.get();
+ return Bytes.toLong(tag.getValueByteBuffer().array(),
tag.getValueOffset(),
+ tag.getValueLength());
+ }
+ return Long.MAX_VALUE;
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomDateTieredCompactionPolicy.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomDateTieredCompactionPolicy.java
new file mode 100644
index 00000000000..dcc97c63d02
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomDateTieredCompactionPolicy.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static
org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
+import org.apache.hadoop.hbase.regionserver.StoreUtils;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Custom implementation of DateTieredCompactionPolicy that calculates
compaction boundaries based
+ * on the <b>hbase.hstore.compaction.date.tiered.custom.age.limit.millis</b>
configuration property
+ * and the TIERING_CELL_MIN/TIERING_CELL_MAX stored on metadata of each store
file. This policy
+ * would produce either one or two tiers: - One tier if either all files data
age are older than the
+ * configured age limit or all files data age are younger than the configured
age limit. - Two tiers
+ * if files have both younger and older data than the configured age limit.
+ */
[email protected]
+public class CustomDateTieredCompactionPolicy extends
DateTieredCompactionPolicy {
+
+ public static final String AGE_LIMIT_MILLIS =
+ "hbase.hstore.compaction.date.tiered.custom.age.limit.millis";
+
+ // Defaults to 10 years
+ public static final long DEFAULT_AGE_LIMIT_MILLIS =
+ (long) (10L * 365.25 * 24L * 60L * 60L * 1000L);
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CustomDateTieredCompactionPolicy.class);
+
+ private long cutOffTimestamp;
+
+ public CustomDateTieredCompactionPolicy(Configuration conf,
+ StoreConfigInformation storeConfigInfo) throws IOException {
+ super(conf, storeConfigInfo);
+ cutOffTimestamp = EnvironmentEdgeManager.currentTime()
+ - conf.getLong(AGE_LIMIT_MILLIS, DEFAULT_AGE_LIMIT_MILLIS);
+
+ }
+
+ @Override
+ protected List<Long> getCompactBoundariesForMajor(Collection<HStoreFile>
filesToCompact,
+ long now) {
+ MutableLong min = new MutableLong(Long.MAX_VALUE);
+ MutableLong max = new MutableLong(0);
+ filesToCompact.forEach(f -> {
+ byte[] timeRangeBytes = f.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
+ long minCurrent = Long.MAX_VALUE;
+ long maxCurrent = 0;
+ if (timeRangeBytes != null) {
+ try {
+ TimeRangeTracker timeRangeTracker =
TimeRangeTracker.parseFrom(timeRangeBytes);
+ timeRangeTracker.getMin();
+ minCurrent = timeRangeTracker.getMin();
+ maxCurrent = timeRangeTracker.getMax();
+ } catch (IOException e) {
+ LOG.warn("Got TIERING_CELL_TIME_RANGE info from file, but failed to
parse it:", e);
+ }
+ }
+ if (minCurrent < min.getValue()) {
+ min.setValue(minCurrent);
+ }
+ if (maxCurrent > max.getValue()) {
+ max.setValue(maxCurrent);
+ }
+ });
+
+ List<Long> boundaries = new ArrayList<>();
+ boundaries.add(Long.MIN_VALUE);
+ if (min.getValue() < cutOffTimestamp) {
+ boundaries.add(min.getValue());
+ if (max.getValue() > cutOffTimestamp) {
+ boundaries.add(cutOffTimestamp);
+ }
+ }
+ return boundaries;
+ }
+
+ @Override
+ public CompactionRequestImpl selectMinorCompaction(ArrayList<HStoreFile>
candidateSelection,
+ boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
+ ArrayList<HStoreFile> filteredByPolicy = this.compactionPolicyPerWindow
+ .applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
+ return selectMajorCompaction(filteredByPolicy);
+ }
+
+ @Override
+ public boolean shouldPerformMajorCompaction(Collection<HStoreFile>
filesToCompact)
+ throws IOException {
+ long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
+ long now = EnvironmentEdgeManager.currentTime();
+ if (isMajorCompactionTime(filesToCompact, now, lowTimestamp)) {
+ long cfTTL = this.storeConfigInfo.getStoreFileTtl();
+ int countLower = 0;
+ int countHigher = 0;
+ HDFSBlocksDistribution hdfsBlocksDistribution = new
HDFSBlocksDistribution();
+ for (HStoreFile f : filesToCompact) {
+ if (checkForTtl(cfTTL, f)) {
+ return true;
+ }
+ if (isMajorOrBulkloadResult(f, now - lowTimestamp)) {
+ return true;
+ }
+ byte[] timeRangeBytes = f.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
+ TimeRangeTracker timeRangeTracker =
TimeRangeTracker.parseFrom(timeRangeBytes);
+ if (timeRangeTracker.getMin() < cutOffTimestamp) {
+ if (timeRangeTracker.getMax() > cutOffTimestamp) {
+ // Found at least one file crossing the cutOffTimestamp
+ return true;
+ } else {
+ countLower++;
+ }
+ } else {
+ countHigher++;
+ }
+ hdfsBlocksDistribution.add(f.getHDFSBlockDistribution());
+ }
+ // If we haven't found any files crossing the cutOffTimestamp, we have
to check
+ // if there are at least more than one file on each tier and if so,
perform compaction
+ if (countLower > 1 || countHigher > 1) {
+ return true;
+ }
+ return checkBlockLocality(hdfsBlocksDistribution);
+ }
+ return false;
+ }
+
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomTieredCompactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomTieredCompactor.java
new file mode 100644
index 00000000000..47e4e142bda
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomTieredCompactor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter;
+import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public class CustomTieredCompactor extends DateTieredCompactor {
+
+ public static final String TIERING_VALUE_PROVIDER =
+ "hbase.hstore.custom-tiering-value.provider.class";
+ private TieringValueProvider tieringValueProvider;
+
+ public CustomTieredCompactor(Configuration conf, HStore store) throws
IOException {
+ super(conf, store);
+ String className =
+ conf.get(TIERING_VALUE_PROVIDER,
CustomCellTieringValueProvider.class.getName());
+ try {
+ tieringValueProvider =
+ (TieringValueProvider)
Class.forName(className).getConstructor().newInstance();
+ tieringValueProvider.init(conf);
+ } catch (Exception e) {
+ throw new IOException("Unable to load configured tiering value provider
'" + className + "'",
+ e);
+ }
+ }
+
+ @Override
+ protected List<ExtendedCell> decorateCells(List<ExtendedCell> cells) {
+ return tieringValueProvider.decorateCells(cells);
+ }
+
+ @Override
+ protected DateTieredMultiFileWriter createMultiWriter(final
CompactionRequestImpl request,
+ final List<Long> lowerBoundaries, final Map<Long, String>
lowerBoundariesPolicies) {
+ return new CustomTieringMultiFileWriter(lowerBoundaries,
lowerBoundariesPolicies,
+ needEmptyFile(request),
CustomTieredCompactor.this.tieringValueProvider::getTieringValue);
+ }
+
+ public interface TieringValueProvider {
+
+ void init(Configuration configuration) throws Exception;
+
+ default List<ExtendedCell> decorateCells(List<ExtendedCell> cells) {
+ return cells;
+ }
+
+ long getTieringValue(ExtendedCell cell);
+ }
+
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
index 9dbe9aae9cf..2cce0d67d77 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -66,7 +67,7 @@ public class DateTieredCompactionPolicy extends
SortedCompactionPolicy {
private static final Logger LOG =
LoggerFactory.getLogger(DateTieredCompactionPolicy.class);
- private final RatioBasedCompactionPolicy compactionPolicyPerWindow;
+ protected final RatioBasedCompactionPolicy compactionPolicyPerWindow;
private final CompactionWindowFactory windowFactory;
@@ -108,9 +109,8 @@ public class DateTieredCompactionPolicy extends
SortedCompactionPolicy {
}
}
- @Override
- public boolean shouldPerformMajorCompaction(Collection<HStoreFile>
filesToCompact)
- throws IOException {
+ protected boolean isMajorCompactionTime(Collection<HStoreFile>
filesToCompact, long now,
+ long lowestModificationTime) throws IOException {
long mcTime = getNextMajorCompactTime(filesToCompact);
if (filesToCompact == null || mcTime == 0) {
if (LOG.isDebugEnabled()) {
@@ -118,58 +118,40 @@ public class DateTieredCompactionPolicy extends
SortedCompactionPolicy {
}
return false;
}
-
// TODO: Use better method for determining stamp of last major (HBASE-2990)
- long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
- long now = EnvironmentEdgeManager.currentTime();
- if (lowTimestamp <= 0L || lowTimestamp >= (now - mcTime)) {
+ if (lowestModificationTime <= 0L || lowestModificationTime >= (now -
mcTime)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("lowTimestamp: " + lowTimestamp + " lowTimestamp: " +
lowTimestamp + " now: "
- + now + " mcTime: " + mcTime);
+ LOG.debug("lowTimestamp: " + lowestModificationTime + " lowTimestamp: "
+ + lowestModificationTime + " now: " + now + " mcTime: " + mcTime);
}
return false;
}
+ return true;
+ }
- long cfTTL = this.storeConfigInfo.getStoreFileTtl();
- HDFSBlocksDistribution hdfsBlocksDistribution = new
HDFSBlocksDistribution();
- List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, now);
- boolean[] filesInWindow = new boolean[boundaries.size()];
-
- for (HStoreFile file : filesToCompact) {
- OptionalLong minTimestamp = file.getMinimumTimestamp();
- long oldest = minTimestamp.isPresent() ? now - minTimestamp.getAsLong()
: Long.MIN_VALUE;
- if (cfTTL != Long.MAX_VALUE && oldest >= cfTTL) {
- LOG.debug("Major compaction triggered on store " + this + "; for TTL
maintenance");
- return true;
- }
- if (!file.isMajorCompactionResult() || file.isBulkLoadResult()) {
- LOG.debug("Major compaction triggered on store " + this
- + ", because there are new files and time since last major
compaction "
- + (now - lowTimestamp) + "ms");
- return true;
- }
+ protected boolean checkForTtl(long ttl, HStoreFile file) {
+ OptionalLong minTimestamp = file.getMinimumTimestamp();
+ long oldest = minTimestamp.isPresent()
+ ? EnvironmentEdgeManager.currentTime() - minTimestamp.getAsLong()
+ : Long.MIN_VALUE;
+ if (ttl != Long.MAX_VALUE && oldest >= ttl) {
+ LOG.debug("Major compaction triggered on store " + this + "; for TTL
maintenance");
+ return true;
+ }
+ return false;
+ }
- int lowerWindowIndex =
- Collections.binarySearch(boundaries,
minTimestamp.orElse(Long.MAX_VALUE));
- int upperWindowIndex =
- Collections.binarySearch(boundaries,
file.getMaximumTimestamp().orElse(Long.MAX_VALUE));
- // Handle boundary conditions and negative values of binarySearch
- lowerWindowIndex = (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex +
2) : lowerWindowIndex;
- upperWindowIndex = (upperWindowIndex < 0) ? Math.abs(upperWindowIndex +
2) : upperWindowIndex;
- if (lowerWindowIndex != upperWindowIndex) {
- LOG.debug("Major compaction triggered on store " + this + "; because
file " + file.getPath()
- + " has data with timestamps cross window boundaries");
- return true;
- } else if (filesInWindow[upperWindowIndex]) {
- LOG.debug("Major compaction triggered on store " + this
- + "; because there are more than one file in some windows");
- return true;
- } else {
- filesInWindow[upperWindowIndex] = true;
- }
- hdfsBlocksDistribution.add(file.getHDFSBlockDistribution());
+ protected boolean isMajorOrBulkloadResult(HStoreFile file, long timeDiff) {
+ if (!file.isMajorCompactionResult() || file.isBulkLoadResult()) {
+ LOG.debug("Major compaction triggered on store " + this
+ + ", because there are new files and time since last major compaction
" + timeDiff + "ms");
+ return true;
}
+ return false;
+ }
+ protected boolean checkBlockLocality(HDFSBlocksDistribution
hdfsBlocksDistribution)
+ throws UnknownHostException {
float blockLocalityIndex = hdfsBlocksDistribution
.getBlockLocalityIndex(DNS.getHostname(comConf.conf,
DNS.ServerType.REGIONSERVER));
if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
@@ -178,9 +160,55 @@ public class DateTieredCompactionPolicy extends
SortedCompactionPolicy {
+ " (min " + comConf.getMinLocalityToForceCompact() + ")");
return true;
}
+ return false;
+ }
- LOG.debug(
- "Skipping major compaction of " + this + ", because the files are
already major compacted");
+ @Override
+ public boolean shouldPerformMajorCompaction(Collection<HStoreFile>
filesToCompact)
+ throws IOException {
+ long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
+ long now = EnvironmentEdgeManager.currentTime();
+ if (isMajorCompactionTime(filesToCompact, now, lowTimestamp)) {
+ long cfTTL = this.storeConfigInfo.getStoreFileTtl();
+ HDFSBlocksDistribution hdfsBlocksDistribution = new
HDFSBlocksDistribution();
+ List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact,
now);
+ boolean[] filesInWindow = new boolean[boundaries.size()];
+ for (HStoreFile file : filesToCompact) {
+ OptionalLong minTimestamp = file.getMinimumTimestamp();
+ if (checkForTtl(cfTTL, file)) {
+ return true;
+ }
+ if (isMajorOrBulkloadResult(file, now - lowTimestamp)) {
+ return true;
+ }
+ int lowerWindowIndex =
+ Collections.binarySearch(boundaries,
minTimestamp.orElse(Long.MAX_VALUE));
+ int upperWindowIndex =
+ Collections.binarySearch(boundaries,
file.getMaximumTimestamp().orElse(Long.MAX_VALUE));
+ // Handle boundary conditions and negative values of binarySearch
+ lowerWindowIndex =
+ (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex + 2) :
lowerWindowIndex;
+ upperWindowIndex =
+ (upperWindowIndex < 0) ? Math.abs(upperWindowIndex + 2) :
upperWindowIndex;
+ if (lowerWindowIndex != upperWindowIndex) {
+ LOG.debug("Major compaction triggered on store " + this + "; because
file "
+ + file.getPath() + " has data with timestamps cross window
boundaries");
+ return true;
+ } else if (filesInWindow[upperWindowIndex]) {
+ LOG.debug("Major compaction triggered on store " + this
+ + "; because there are more than one file in some windows");
+ return true;
+ } else {
+ filesInWindow[upperWindowIndex] = true;
+ }
+ hdfsBlocksDistribution.add(file.getHDFSBlockDistribution());
+ }
+ if (checkBlockLocality(hdfsBlocksDistribution)) {
+ return true;
+ }
+ LOG.debug(
+ "Skipping major compaction of " + this + ", because the files are
already major compacted");
+ }
return false;
}
@@ -296,7 +324,8 @@ public class DateTieredCompactionPolicy extends
SortedCompactionPolicy {
/**
* Return a list of boundaries for multiple compaction output in ascending
order.
*/
- private List<Long> getCompactBoundariesForMajor(Collection<HStoreFile>
filesToCompact, long now) {
+ protected List<Long> getCompactBoundariesForMajor(Collection<HStoreFile>
filesToCompact,
+ long now) {
long minTimestamp = filesToCompact.stream()
.mapToLong(f ->
f.getMinimumTimestamp().orElse(Long.MAX_VALUE)).min().orElse(Long.MAX_VALUE);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
index b5911b0cec4..9cef2ebc314 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
@@ -46,7 +46,7 @@ public class DateTieredCompactor extends
AbstractMultiOutputCompactor<DateTiered
super(conf, store);
}
- private boolean needEmptyFile(CompactionRequestImpl request) {
+ protected boolean needEmptyFile(CompactionRequestImpl request) {
// if we are going to compact the last N files, then we need to emit an
empty file to retain the
// maxSeqId if we haven't written out anything.
OptionalLong maxSeqId =
StoreUtils.getMaxSequenceIdInList(request.getFiles());
@@ -70,14 +70,20 @@ public class DateTieredCompactor extends
AbstractMultiOutputCompactor<DateTiered
public DateTieredMultiFileWriter createWriter(InternalScanner scanner,
FileDetails fd,
boolean shouldDropBehind, boolean major, Consumer<Path>
writerCreationTracker)
throws IOException {
- DateTieredMultiFileWriter writer = new
DateTieredMultiFileWriter(lowerBoundaries,
- lowerBoundariesPolicies, needEmptyFile(request));
+ DateTieredMultiFileWriter writer =
+ createMultiWriter(request, lowerBoundaries,
lowerBoundariesPolicies);
initMultiWriter(writer, scanner, fd, shouldDropBehind, major,
writerCreationTracker);
return writer;
}
}, throughputController, user);
}
+ protected DateTieredMultiFileWriter createMultiWriter(final
CompactionRequestImpl request,
+ final List<Long> lowerBoundaries, final Map<Long, String>
lowerBoundariesPolicies) {
+ return new DateTieredMultiFileWriter(lowerBoundaries,
lowerBoundariesPolicies,
+ needEmptyFile(request), c -> c.getTimestamp());
+ }
+
@Override
protected List<Path> commitWriter(DateTieredMultiFileWriter writer,
FileDetails fd,
CompactionRequestImpl request) throws IOException {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java
index f031a96d15f..153ad50419b 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -85,7 +86,7 @@ public class TestHFileInlineToRootChunkConversion {
hfw.append(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(k)
.setFamily(HConstants.EMPTY_BYTE_ARRAY).setQualifier(HConstants.EMPTY_BYTE_ARRAY)
.setTimestamp(HConstants.LATEST_TIMESTAMP).setType(KeyValue.Type.Maximum.getCode())
- .setValue(v).build());
+ .setValue(v).setType(Cell.Type.Put).build());
}
hfw.close();
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellDataTieringManager.java
similarity index 89%
copy from
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
copy to
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellDataTieringManager.java
index 37e0fe98e7d..b01717dfa1f 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellDataTieringManager.java
@@ -79,35 +79,37 @@ import org.slf4j.LoggerFactory;
/**
* This class is used to test the functionality of the DataTieringManager.
*
- * The mock online regions are stored in {@link
TestDataTieringManager#testOnlineRegions}.
- * For all tests, the setup of {@link
TestDataTieringManager#testOnlineRegions} occurs only once.
- * Please refer to {@link TestDataTieringManager#setupOnlineRegions()} for the
structure.
- * Additionally, a list of all store files is maintained in {@link
TestDataTieringManager#hStoreFiles}.
+ * The mock online regions are stored in {@link
TestCustomCellDataTieringManager#testOnlineRegions}.
+ * For all tests, the setup of
+ * {@link TestCustomCellDataTieringManager#testOnlineRegions} occurs only
once.
+ * Please refer to {@link
TestCustomCellDataTieringManager#setupOnlineRegions()} for the structure.
+ * Additionally, a list of all store files is
+ * maintained in {@link TestCustomCellDataTieringManager#hStoreFiles}.
* The characteristics of these store files are listed below:
- * @formatter:off ## HStoreFile Information
- *
+ * @formatter:off
+ * ## HStoreFile Information
* | HStoreFile | Region | Store | DataTiering
| isHot |
*
|------------------|--------------------|---------------------|-----------------------|-------|
- * | hStoreFile0 | region1 | hStore11 | TIME_RANGE
| true |
+ * | hStoreFile0 | region1 | hStore11 |
CUSTOM_CELL_VALUE | true |
* | hStoreFile1 | region1 | hStore12 | NONE
| true |
- * | hStoreFile2 | region2 | hStore21 | TIME_RANGE
| true |
- * | hStoreFile3 | region2 | hStore22 | TIME_RANGE
| false |
+ * | hStoreFile2 | region2 | hStore21 |
CUSTOM_CELL_VALUE | true |
+ * | hStoreFile3 | region2 | hStore22 |
CUSTOM_CELL_VALUE | false |
* @formatter:on
*/
@Category({ RegionServerTests.class, SmallTests.class })
-public class TestDataTieringManager {
+public class TestCustomCellDataTieringManager {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestDataTieringManager.class);
+ HBaseClassTestRule.forClass(TestCustomCellDataTieringManager.class);
- private static final Logger LOG =
LoggerFactory.getLogger(TestDataTieringManager.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(TestCustomCellDataTieringManager.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final long DAY = 24 * 60 * 60 * 1000;
private static Configuration defaultConf;
private static FileSystem fs;
- private static BlockCache blockCache;
+ private BlockCache blockCache;
private static CacheConfig cacheConf;
private static Path testDir;
private static final Map<String, HRegion> testOnlineRegions = new
HashMap<>();
@@ -124,10 +126,10 @@ public class TestDataTieringManager {
@BeforeClass
public static void setupBeforeClass() throws Exception {
- testDir =
TEST_UTIL.getDataTestDir(TestDataTieringManager.class.getSimpleName());
+ testDir =
TEST_UTIL.getDataTestDir(TestCustomCellDataTieringManager.class.getSimpleName());
defaultConf = TEST_UTIL.getConfiguration();
updateCommonConfigurations();
- assertTrue(DataTieringManager.instantiate(defaultConf, testOnlineRegions));
+ DataTieringManager.instantiate(defaultConf, testOnlineRegions);
dataTieringManager = DataTieringManager.getInstance();
rowKeyString = "";
}
@@ -227,13 +229,14 @@ public class TestDataTieringManager {
// Test with a filename where corresponding HStoreFile in not present
hFilePath = new Path(hStoreFiles.get(0).getPath().getParent(),
"incorrectFileName");
- testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath,
true);
+ testDataTieringMethodWithPathExpectingException(methodCallerWithPath,
hFilePath,
+ new DataTieringException("Store file corresponding to " + hFilePath + "
doesn't exist"));
}
@Test
public void testPrefetchWhenDataTieringEnabled() throws IOException {
setPrefetchBlocksOnOpen();
- initializeTestEnvironment();
+ this.blockCache = initializeTestEnvironment();
// Evict blocks from cache by closing the files and passing evict on close.
// Then initialize the reader again. Since Prefetch on open is set to
true, it should prefetch
// those blocks.
@@ -283,27 +286,26 @@ public class TestDataTieringManager {
@Test
public void testCacheCompactedBlocksOnWriteDataTieringDisabled() throws
IOException {
setCacheCompactBlocksOnWrite();
- initializeTestEnvironment();
-
- HRegion region = createHRegion("table3");
+ this.blockCache = initializeTestEnvironment();
+ HRegion region = createHRegion("table3", this.blockCache);
testCacheCompactedBlocksOnWrite(region, true);
}
@Test
public void testCacheCompactedBlocksOnWriteWithHotData() throws IOException {
setCacheCompactBlocksOnWrite();
- initializeTestEnvironment();
-
- HRegion region = createHRegion("table3",
getConfWithTimeRangeDataTieringEnabled(5 * DAY));
+ this.blockCache = initializeTestEnvironment();
+ HRegion region =
+ createHRegion("table3", getConfWithCustomCellDataTieringEnabled(5 *
DAY), this.blockCache);
testCacheCompactedBlocksOnWrite(region, true);
}
@Test
public void testCacheCompactedBlocksOnWriteWithColdData() throws IOException
{
setCacheCompactBlocksOnWrite();
- initializeTestEnvironment();
-
- HRegion region = createHRegion("table3",
getConfWithTimeRangeDataTieringEnabled(DAY));
+ this.blockCache = initializeTestEnvironment();
+ HRegion region =
+ createHRegion("table3", getConfWithCustomCellDataTieringEnabled(DAY),
this.blockCache);
testCacheCompactedBlocksOnWrite(region, false);
}
@@ -338,12 +340,11 @@ public class TestDataTieringManager {
Path storeDir = hStore.getStoreContext().getFamilyStoreDirectoryPath();
Configuration configuration = hStore.getReadOnlyConfiguration();
- createHStoreFile(storeDir, configuration, currentTime - 2 * DAY,
- hStore.getHRegion().getRegionFileSystem());
- createHStoreFile(storeDir, configuration, currentTime - 3 * DAY,
- hStore.getHRegion().getRegionFileSystem());
- createHStoreFile(storeDir, configuration, currentTime - 4 * DAY,
- hStore.getHRegion().getRegionFileSystem());
+ HRegionFileSystem regionFS = hStore.getHRegion().getRegionFileSystem();
+
+ createHStoreFile(storeDir, configuration, currentTime - 2 * DAY, regionFS);
+ createHStoreFile(storeDir, configuration, currentTime - 3 * DAY, regionFS);
+ createHStoreFile(storeDir, configuration, currentTime - 4 * DAY, regionFS);
}
@Test
@@ -571,48 +572,37 @@ public class TestDataTieringManager {
@Test
public void testCacheConfigShouldCacheFile() throws Exception {
- // Evict the files from cache.
- for (HStoreFile file : hStoreFiles) {
- file.closeStoreFile(true);
- }
+ initializeTestEnvironment();
// Verify that the API shouldCacheFileBlock returns the result correctly.
// hStoreFiles[0], hStoreFiles[1], hStoreFiles[2] are hot files.
// hStoreFiles[3] is a cold file.
- try {
- assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
- hStoreFiles.get(0).getFileInfo().getHFileInfo(),
- hStoreFiles.get(0).getFileInfo().getConf()));
- assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
- hStoreFiles.get(1).getFileInfo().getHFileInfo(),
- hStoreFiles.get(1).getFileInfo().getConf()));
- assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
- hStoreFiles.get(2).getFileInfo().getHFileInfo(),
- hStoreFiles.get(2).getFileInfo().getConf()));
- assertFalse(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
- hStoreFiles.get(3).getFileInfo().getHFileInfo(),
- hStoreFiles.get(3).getFileInfo().getConf()));
- } finally {
- for (HStoreFile file : hStoreFiles) {
- file.initReader();
- }
- }
+ assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
+ hStoreFiles.get(0).getFileInfo().getHFileInfo(),
hStoreFiles.get(0).getFileInfo().getConf()));
+ assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
+ hStoreFiles.get(1).getFileInfo().getHFileInfo(),
hStoreFiles.get(1).getFileInfo().getConf()));
+ assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
+ hStoreFiles.get(2).getFileInfo().getHFileInfo(),
hStoreFiles.get(2).getFileInfo().getConf()));
+ assertFalse(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA,
+ hStoreFiles.get(3).getFileInfo().getHFileInfo(),
hStoreFiles.get(3).getFileInfo().getConf()));
}
@Test
public void testCacheOnReadColdFile() throws Exception {
+ this.blockCache = initializeTestEnvironment();
// hStoreFiles[3] is a cold file. the blocks should not get loaded after a
readBlock call.
HStoreFile hStoreFile = hStoreFiles.get(3);
BlockCacheKey cacheKey = new BlockCacheKey(hStoreFile.getPath(), 0, true,
BlockType.DATA);
- testCacheOnRead(hStoreFile, cacheKey, 23025, false);
+ testCacheOnRead(hStoreFile, cacheKey, -1, false);
}
@Test
public void testCacheOnReadHotFile() throws Exception {
+ this.blockCache = initializeTestEnvironment();
// hStoreFiles[0] is a hot file. the blocks should get loaded after a
readBlock call.
HStoreFile hStoreFile = hStoreFiles.get(0);
BlockCacheKey cacheKey =
new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA);
- testCacheOnRead(hStoreFile, cacheKey, 23025, true);
+ testCacheOnRead(hStoreFile, cacheKey, -1, true);
}
private void testCacheOnRead(HStoreFile hStoreFile, BlockCacheKey key, long
onDiskBlockSize,
@@ -621,7 +611,7 @@ public class TestDataTieringManager {
hStoreFile.getReader().getHFileReader().readBlock(key.getOffset(),
onDiskBlockSize, true, false,
false, false, key.getBlockType(), DataBlockEncoding.NONE);
// Validate that the hot block gets cached and cold block is not cached.
- HFileBlock block = (HFileBlock) blockCache.getBlock(key, false, false,
false, BlockType.DATA);
+ HFileBlock block = (HFileBlock) blockCache.getBlock(key, false, false,
false);
if (expectedCached) {
assertNotNull(block);
} else {
@@ -643,7 +633,8 @@ public class TestDataTieringManager {
numColdBlocks++;
}
} catch (Exception e) {
- fail("Unexpected exception!");
+ LOG.debug("Error validating priority for key {}", key, e);
+ fail(e.getMessage());
}
}
assertEquals(expectedHotBlocks, numHotBlocks);
@@ -702,26 +693,28 @@ public class TestDataTieringManager {
testDataTieringMethodWithKey(caller, key, expectedResult, null);
}
- private static void initializeTestEnvironment() throws IOException {
- setupFileSystemAndCache();
- setupOnlineRegions();
+ private static BlockCache initializeTestEnvironment() throws IOException {
+ BlockCache blockCache = setupFileSystemAndCache();
+ setupOnlineRegions(blockCache);
+ return blockCache;
}
- private static void setupFileSystemAndCache() throws IOException {
+ private static BlockCache setupFileSystemAndCache() throws IOException {
fs = HFileSystem.get(defaultConf);
- blockCache = BlockCacheFactory.createBlockCache(defaultConf);
+ BlockCache blockCache = BlockCacheFactory.createBlockCache(defaultConf);
cacheConf = new CacheConfig(defaultConf, blockCache);
+ return blockCache;
}
- private static void setupOnlineRegions() throws IOException {
+ private static void setupOnlineRegions(BlockCache blockCache) throws
IOException {
testOnlineRegions.clear();
hStoreFiles.clear();
long day = 24 * 60 * 60 * 1000;
long currentTime = System.currentTimeMillis();
- HRegion region1 = createHRegion("table1");
+ HRegion region1 = createHRegion("table1", blockCache);
- HStore hStore11 = createHStore(region1, "cf1",
getConfWithTimeRangeDataTieringEnabled(day));
+ HStore hStore11 = createHStore(region1, "cf1",
getConfWithCustomCellDataTieringEnabled(day));
hStoreFiles.add(createHStoreFile(hStore11.getStoreContext().getFamilyStoreDirectoryPath(),
hStore11.getReadOnlyConfiguration(), currentTime,
region1.getRegionFileSystem()));
hStore11.refreshStoreFiles();
@@ -733,8 +726,8 @@ public class TestDataTieringManager {
region1.stores.put(Bytes.toBytes("cf1"), hStore11);
region1.stores.put(Bytes.toBytes("cf2"), hStore12);
- HRegion region2 =
- createHRegion("table2", getConfWithTimeRangeDataTieringEnabled((long)
(2.5 * day)));
+ HRegion region2 = createHRegion("table2",
+ getConfWithCustomCellDataTieringEnabled((long) (2.5 * day)), blockCache);
HStore hStore21 = createHStore(region2, "cf1");
hStoreFiles.add(createHStoreFile(hStore21.getStoreContext().getFamilyStoreDirectoryPath(),
@@ -756,11 +749,12 @@ public class TestDataTieringManager {
testOnlineRegions.put(region2.getRegionInfo().getEncodedName(), region2);
}
- private static HRegion createHRegion(String table) throws IOException {
- return createHRegion(table, defaultConf);
+ private static HRegion createHRegion(String table, BlockCache blockCache)
throws IOException {
+ return createHRegion(table, defaultConf, blockCache);
}
- private static HRegion createHRegion(String table, Configuration conf)
throws IOException {
+ private static HRegion createHRegion(String table, Configuration conf,
BlockCache blockCache)
+ throws IOException {
TableName tableName = TableName.valueOf(table);
TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
@@ -799,15 +793,8 @@ public class TestDataTieringManager {
return new HStore(region, columnFamilyDescriptor, conf, false);
}
- private static Configuration getConfWithTimeRangeDataTieringEnabled(long
hotDataAge) {
- Configuration conf = new Configuration(defaultConf);
- conf.set(DataTieringManager.DATATIERING_KEY,
DataTieringType.TIME_RANGE.name());
- conf.set(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY,
String.valueOf(hotDataAge));
- return conf;
- }
-
private static HStoreFile createHStoreFile(Path storeDir, Configuration
conf, long timestamp,
- HRegionFileSystem regionFs) throws IOException {
+ HRegionFileSystem regionFs) throws IOException {
String columnFamily = storeDir.getName();
StoreFileWriter storeFileWriter = new StoreFileWriter.Builder(conf,
cacheConf, fs)
@@ -815,11 +802,17 @@ public class TestDataTieringManager {
writeStoreFileRandomData(storeFileWriter, Bytes.toBytes(columnFamily),
timestamp);
- StoreContext storeContext =
- StoreContext.getBuilder().withRegionFileSystem(regionFs).build();
-
+ StoreContext storeContext =
StoreContext.getBuilder().withRegionFileSystem(regionFs).build();
StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true,
storeContext);
- return new HStoreFile(fs, storeFileWriter.getPath(), conf, cacheConf,
BloomType.NONE, true, sft);
+ return new HStoreFile(fs, storeFileWriter.getPath(), conf, cacheConf,
BloomType.NONE, true,
+ sft);
+ }
+
+ private static Configuration getConfWithCustomCellDataTieringEnabled(long
hotDataAge) {
+ Configuration conf = new Configuration(defaultConf);
+ conf.set(DataTieringManager.DATATIERING_KEY,
DataTieringType.CUSTOM.name());
+ conf.set(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY,
String.valueOf(hotDataAge));
+ return conf;
}
/**
@@ -839,6 +832,10 @@ public class TestDataTieringManager {
}
} finally {
writer.appendTrackedTimestampsToMetadata();
+ TimeRangeTracker timeRangeTracker =
TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
+ timeRangeTracker.setMin(timestamp);
+ timeRangeTracker.setMax(timestamp);
+ writer.appendCustomCellTimestampsToMetadata(timeRangeTracker);
writer.close();
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellTieredCompactionPolicy.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellTieredCompactionPolicy.java
new file mode 100644
index 00000000000..c89e9919717
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellTieredCompactionPolicy.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static
org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.UUID;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import
org.apache.hadoop.hbase.regionserver.compactions.CustomDateTieredCompactionPolicy;
+import
org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
+import
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerForTest;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestCustomCellTieredCompactionPolicy {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestCustomCellTieredCompactionPolicy.class);
+
+ private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+
+ public static final byte[] FAMILY = Bytes.toBytes("cf");
+
+ private HStoreFile createFile(Path file, long minValue, long maxValue, long
size, int seqId)
+ throws IOException {
+ return createFile(mockRegionInfo(), file, minValue, maxValue, size, seqId,
0);
+ }
+
+ private HStoreFile createFile(RegionInfo regionInfo, Path file, long
minValue, long maxValue,
+ long size, int seqId, long ageInDisk) throws IOException {
+ FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
+ HRegionFileSystem regionFileSystem =
+ new HRegionFileSystem(TEST_UTIL.getConfiguration(), fs, file,
regionInfo);
+ StoreContext ctx = new StoreContext.Builder()
+
.withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build())
+ .withRegionFileSystem(regionFileSystem).build();
+ StoreFileTrackerForTest sftForTest =
+ new StoreFileTrackerForTest(TEST_UTIL.getConfiguration(), true, ctx);
+ MockHStoreFile msf =
+ new MockHStoreFile(TEST_UTIL, file, size, ageInDisk, false, (long)
seqId, sftForTest);
+ TimeRangeTracker timeRangeTracker =
TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
+ timeRangeTracker.setMin(minValue);
+ timeRangeTracker.setMax(maxValue);
+ msf.setMetadataValue(CUSTOM_TIERING_TIME_RANGE,
TimeRangeTracker.toByteArray(timeRangeTracker));
+ return msf;
+ }
+
+ private CustomDateTieredCompactionPolicy mockAndCreatePolicy() throws
Exception {
+ RegionInfo mockedRegionInfo = mockRegionInfo();
+ return mockAndCreatePolicy(mockedRegionInfo);
+ }
+
+ private CustomDateTieredCompactionPolicy mockAndCreatePolicy(RegionInfo
regionInfo)
+ throws Exception {
+ StoreConfigInformation mockedStoreConfig =
mock(StoreConfigInformation.class);
+ when(mockedStoreConfig.getRegionInfo()).thenReturn(regionInfo);
+ CustomDateTieredCompactionPolicy policy =
+ new CustomDateTieredCompactionPolicy(TEST_UTIL.getConfiguration(),
mockedStoreConfig);
+ return policy;
+ }
+
+ private RegionInfo mockRegionInfo() {
+ RegionInfo mockedRegionInfo = mock(RegionInfo.class);
+ when(mockedRegionInfo.getEncodedName()).thenReturn("1234567890987654321");
+ return mockedRegionInfo;
+ }
+
+ private Path preparePath() throws Exception {
+ FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
+ Path file =
+ new Path(TEST_UTIL.getDataTestDir(),
UUID.randomUUID().toString().replaceAll("-", ""));
+ fs.create(file);
+ return file;
+ }
+
+ @Test
+ public void testGetCompactBoundariesForMajorNoOld() throws Exception {
+ CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+ Path file = preparePath();
+ ArrayList<HStoreFile> files = new ArrayList<>();
+ files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+ EnvironmentEdgeManager.currentTime(), 1024, 0));
+ files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+ EnvironmentEdgeManager.currentTime(), 1024, 1));
+ assertEquals(1,
+ ((DateTieredCompactionRequest)
policy.selectMajorCompaction(files)).getBoundaries().size());
+ }
+
+ @Test
+ public void testGetCompactBoundariesForMajorAllOld() throws Exception {
+ CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+ Path file = preparePath();
+ ArrayList<HStoreFile> files = new ArrayList<>();
+ // The default cut off age is 10 years, so any of the min/max value there
should get in the old
+ // tier
+ files.add(createFile(file, 0, 1, 1024, 0));
+ files.add(createFile(file, 2, 3, 1024, 1));
+ assertEquals(2,
+ ((DateTieredCompactionRequest)
policy.selectMajorCompaction(files)).getBoundaries().size());
+ }
+
+ @Test
+ public void testGetCompactBoundariesForMajorOneOnEachSide() throws Exception
{
+ CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+ Path file = preparePath();
+ ArrayList<HStoreFile> files = new ArrayList<>();
+ files.add(createFile(file, 0, 1, 1024, 0));
+ files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+ EnvironmentEdgeManager.currentTime(), 1024, 1));
+ assertEquals(3,
+ ((DateTieredCompactionRequest)
policy.selectMajorCompaction(files)).getBoundaries().size());
+ }
+
+ @Test
+ public void testGetCompactBoundariesForMajorOneCrossing() throws Exception {
+ CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+ Path file = preparePath();
+ ArrayList<HStoreFile> files = new ArrayList<>();
+ files.add(createFile(file, 0, EnvironmentEdgeManager.currentTime(), 1024,
0));
+ assertEquals(3,
+ ((DateTieredCompactionRequest)
policy.selectMajorCompaction(files)).getBoundaries().size());
+ }
+
+ @FunctionalInterface
+ interface PolicyValidator<T, U> {
+ void accept(T t, U u) throws Exception;
+ }
+
+ private void testShouldPerformMajorCompaction(long min, long max, int
numFiles,
+ PolicyValidator<CustomDateTieredCompactionPolicy, ArrayList<HStoreFile>>
validation)
+ throws Exception {
+ CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+ RegionInfo mockedRegionInfo = mockRegionInfo();
+ Path file = preparePath();
+ ArrayList<HStoreFile> files = new ArrayList<>();
+ ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
+ EnvironmentEdgeManager.injectEdge(timeMachine);
+ for (int i = 0; i < numFiles; i++) {
+ MockHStoreFile mockedSFile = (MockHStoreFile)
createFile(mockedRegionInfo, file, min, max,
+ 1024, 0, HConstants.DEFAULT_MAJOR_COMPACTION_PERIOD);
+ mockedSFile.setIsMajor(true);
+ files.add(mockedSFile);
+ }
+ EnvironmentEdgeManager.reset();
+ validation.accept(policy, files);
+ }
+
+ @Test
+ public void testShouldPerformMajorCompactionOneFileCrossing() throws
Exception {
+ long max = EnvironmentEdgeManager.currentTime();
+ testShouldPerformMajorCompaction(0, max, 1,
+ (p, f) -> assertTrue(p.shouldPerformMajorCompaction(f)));
+ }
+
+ @Test
+ public void testShouldPerformMajorCompactionOneFileMinMaxLow() throws
Exception {
+ testShouldPerformMajorCompaction(0, 1, 1,
+ (p, f) -> assertFalse(p.shouldPerformMajorCompaction(f)));
+ }
+
+ @Test
+ public void testShouldPerformMajorCompactionOneFileMinMaxHigh() throws
Exception {
+ long currentTime = EnvironmentEdgeManager.currentTime();
+ testShouldPerformMajorCompaction(currentTime, currentTime, 1,
+ (p, f) -> assertFalse(p.shouldPerformMajorCompaction(f)));
+ }
+
+ @Test
+ public void testShouldPerformMajorCompactionTwoFilesMinMaxHigh() throws
Exception {
+ long currentTime = EnvironmentEdgeManager.currentTime();
+ testShouldPerformMajorCompaction(currentTime, currentTime, 2,
+ (p, f) -> assertTrue(p.shouldPerformMajorCompaction(f)));
+ }
+
+ @Test
+ public void testSelectMinorCompactionTwoFilesNoOld() throws Exception {
+ CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+ Path file = preparePath();
+ ArrayList<HStoreFile> files = new ArrayList<>();
+ files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+ EnvironmentEdgeManager.currentTime(), 1024, 0));
+ files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+ EnvironmentEdgeManager.currentTime(), 1024, 1));
+ // Shouldn't do minor compaction, as minimum number of files
+ // for minor compactions is 3
+ assertEquals(0, policy.selectMinorCompaction(files, true,
true).getFiles().size());
+ }
+
+ @Test
+ public void testSelectMinorCompactionThreeFilesNoOld() throws Exception {
+ CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+ Path file = preparePath();
+ ArrayList<HStoreFile> files = new ArrayList<>();
+ files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+ EnvironmentEdgeManager.currentTime(), 1024, 0));
+ files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+ EnvironmentEdgeManager.currentTime(), 1024, 1));
+ files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+ EnvironmentEdgeManager.currentTime(), 1024, 2));
+ assertEquals(3, policy.selectMinorCompaction(files, true,
true).getFiles().size());
+ }
+
+ @Test
+ public void testSelectMinorCompactionThreeFilesAllOld() throws Exception {
+ CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+ Path file = preparePath();
+ ArrayList<HStoreFile> files = new ArrayList<>();
+ files.add(createFile(file, 0, 1, 1024, 0));
+ files.add(createFile(file, 1, 2, 1024, 1));
+ files.add(createFile(file, 3, 4, 1024, 2));
+ assertEquals(3, policy.selectMinorCompaction(files, true,
true).getFiles().size());
+ }
+
+ @Test
+ public void testSelectMinorCompactionThreeFilesOneOldTwoNew() throws
Exception {
+ CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+ Path file = preparePath();
+ ArrayList<HStoreFile> files = new ArrayList<>();
+ files.add(createFile(file, 0, 1, 1024, 0));
+ files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+ EnvironmentEdgeManager.currentTime(), 1024, 1));
+ files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+ EnvironmentEdgeManager.currentTime(), 1024, 2));
+ assertEquals(3, policy.selectMinorCompaction(files, true,
true).getFiles().size());
+ }
+
+ @Test
+ public void testSelectMinorCompactionThreeFilesTwoOldOneNew() throws
Exception {
+ CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy();
+ Path file = preparePath();
+ ArrayList<HStoreFile> files = new ArrayList<>();
+ files.add(createFile(file, 0, 1, 1024, 0));
+ files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+ EnvironmentEdgeManager.currentTime(), 1024, 1));
+ files.add(createFile(file, EnvironmentEdgeManager.currentTime(),
+ EnvironmentEdgeManager.currentTime(), 1024, 2));
+ assertEquals(3, policy.selectMinorCompaction(files, true,
true).getFiles().size());
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
index 37e0fe98e7d..bf82a531f19 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java
@@ -227,7 +227,8 @@ public class TestDataTieringManager {
// Test with a filename where corresponding HStoreFile in not present
hFilePath = new Path(hStoreFiles.get(0).getPath().getParent(),
"incorrectFileName");
- testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath,
true);
+ testDataTieringMethodWithPathExpectingException(methodCallerWithPath,
hFilePath,
+ new DataTieringException("Store file corresponding to " + hFilePath + "
doesn't exist"));
}
@Test
@@ -600,19 +601,21 @@ public class TestDataTieringManager {
@Test
public void testCacheOnReadColdFile() throws Exception {
+ initializeTestEnvironment();
// hStoreFiles[3] is a cold file. the blocks should not get loaded after a
readBlock call.
HStoreFile hStoreFile = hStoreFiles.get(3);
BlockCacheKey cacheKey = new BlockCacheKey(hStoreFile.getPath(), 0, true,
BlockType.DATA);
- testCacheOnRead(hStoreFile, cacheKey, 23025, false);
+ testCacheOnRead(hStoreFile, cacheKey, -1, false);
}
@Test
public void testCacheOnReadHotFile() throws Exception {
+ initializeTestEnvironment();
// hStoreFiles[0] is a hot file. the blocks should get loaded after a
readBlock call.
HStoreFile hStoreFile = hStoreFiles.get(0);
BlockCacheKey cacheKey =
new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA);
- testCacheOnRead(hStoreFile, cacheKey, 23025, true);
+ testCacheOnRead(hStoreFile, cacheKey, -1, true);
}
private void testCacheOnRead(HStoreFile hStoreFile, BlockCacheKey key, long
onDiskBlockSize,
@@ -806,8 +809,8 @@ public class TestDataTieringManager {
return conf;
}
- private static HStoreFile createHStoreFile(Path storeDir, Configuration
conf, long timestamp,
- HRegionFileSystem regionFs) throws IOException {
+ static HStoreFile createHStoreFile(Path storeDir, Configuration conf, long
timestamp,
+ HRegionFileSystem regionFs) throws IOException {
String columnFamily = storeDir.getName();
StoreFileWriter storeFileWriter = new StoreFileWriter.Builder(conf,
cacheConf, fs)
@@ -815,11 +818,11 @@ public class TestDataTieringManager {
writeStoreFileRandomData(storeFileWriter, Bytes.toBytes(columnFamily),
timestamp);
- StoreContext storeContext =
- StoreContext.getBuilder().withRegionFileSystem(regionFs).build();
+ StoreContext storeContext =
StoreContext.getBuilder().withRegionFileSystem(regionFs).build();
StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true,
storeContext);
- return new HStoreFile(fs, storeFileWriter.getPath(), conf, cacheConf,
BloomType.NONE, true, sft);
+ return new HStoreFile(fs, storeFileWriter.getPath(), conf, cacheConf,
BloomType.NONE, true,
+ sft);
}
/**
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java
new file mode 100644
index 00000000000..253bdb43567
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static
org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
+import static
org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieringValueProvider.TIERING_CELL_QUALIFIER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.CustomTieredStoreEngine;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestCustomCellTieredCompactor {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestCustomCellTieredCompactor.class);
+
+ public static final byte[] FAMILY = Bytes.toBytes("cf");
+
+ protected HBaseTestingUtil utility;
+
+ protected Admin admin;
+
+ @Before
+ public void setUp() throws Exception {
+ utility = new HBaseTestingUtil();
+
utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval",
10);
+ utility.startMiniCluster();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ utility.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testCustomCellTieredCompactor() throws Exception {
+ ColumnFamilyDescriptorBuilder clmBuilder =
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY);
+ clmBuilder.setValue("hbase.hstore.engine.class",
CustomTieredStoreEngine.class.getName());
+ clmBuilder.setValue(TIERING_CELL_QUALIFIER, "date");
+ TableName tableName = TableName.valueOf("testCustomCellTieredCompactor");
+ TableDescriptorBuilder tblBuilder =
TableDescriptorBuilder.newBuilder(tableName);
+ tblBuilder.setColumnFamily(clmBuilder.build());
+ utility.getAdmin().createTable(tblBuilder.build());
+ utility.waitTableAvailable(tableName);
+ Connection connection = utility.getConnection();
+ Table table = connection.getTable(tableName);
+ long recordTime = System.currentTimeMillis();
+ // write data and flush multiple store files:
+ for (int i = 0; i < 6; i++) {
+ List<Put> puts = new ArrayList<>(2);
+ Put put = new Put(Bytes.toBytes(i));
+ put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + i));
+ put.addColumn(FAMILY, Bytes.toBytes("date"),
+ Bytes.toBytes(recordTime - (11L * 366L * 24L * 60L * 60L * 1000L)));
+ puts.add(put);
+ put = new Put(Bytes.toBytes(i + 1000));
+ put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + (i +
1000)));
+ put.addColumn(FAMILY, Bytes.toBytes("date"), Bytes.toBytes(recordTime));
+ puts.add(put);
+ table.put(puts);
+ utility.flush(tableName);
+ }
+ table.close();
+ long firstCompactionTime = System.currentTimeMillis();
+ utility.getAdmin().majorCompact(tableName);
+ Waiter.waitFor(utility.getConfiguration(), 5000,
+ () ->
utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(tableName)
+ > firstCompactionTime);
+ long numHFiles = utility.getNumHFiles(tableName, FAMILY);
+ // The first major compaction would have no means to detect more than one
tier,
+ // because without the min/max values available in the file info portion
of the selected files
+ // for compaction, CustomCellDateTieredCompactionPolicy has no means
+ // to calculate the proper boundaries.
+ assertEquals(1, numHFiles);
+
utility.getMiniHBaseCluster().getRegions(tableName).get(0).getStore(FAMILY).getStorefiles()
+ .forEach(file -> {
+ byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
+ assertNotNull(rangeBytes);
+ try {
+ TimeRangeTracker timeRangeTracker =
TimeRangeTracker.parseFrom(rangeBytes);
+ assertEquals((recordTime - (11L * 366L * 24L * 60L * 60L * 1000L)),
+ timeRangeTracker.getMin());
+ assertEquals(recordTime, timeRangeTracker.getMax());
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
+ });
+ // now do major compaction again, to make sure we write two separate files
+ long secondCompactionTime = System.currentTimeMillis();
+ utility.getAdmin().majorCompact(tableName);
+ Waiter.waitFor(utility.getConfiguration(), 5000,
+ () ->
utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(tableName)
+ > secondCompactionTime);
+ numHFiles = utility.getNumHFiles(tableName, FAMILY);
+ assertEquals(2, numHFiles);
+
utility.getMiniHBaseCluster().getRegions(tableName).get(0).getStore(FAMILY).getStorefiles()
+ .forEach(file -> {
+ byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
+ assertNotNull(rangeBytes);
+ try {
+ TimeRangeTracker timeRangeTracker =
TimeRangeTracker.parseFrom(rangeBytes);
+ assertEquals(timeRangeTracker.getMin(), timeRangeTracker.getMax());
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
+ });
+ }
+}