This is an automated email from the ASF dual-hosted git repository. wchevreuil pushed a commit to branch HBASE-29427 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 8510a8986db5678f9ac5c09b6a6acbeb69878246 Author: Wellington Ramos Chevreuil <[email protected]> AuthorDate: Fri Nov 8 10:07:00 2024 +0000 HBASE-29413 Implement a custom qualifier tiered compaction Change-Id: I6eff2199afbc8c02426b07b696c18af258bcb36c --- .../main/java/org/apache/hadoop/hbase/TagType.java | 2 + .../hadoop/hbase/io/hfile/HFileWriterImpl.java | 12 +- .../regionserver/CustomCellTieredStoreEngine.java | 9 ++ .../regionserver/CustomTieringMultiFileWriter.java | 73 ++++++++++ .../regionserver/DateTieredMultiFileWriter.java | 21 ++- .../hbase/regionserver/compactions/Compactor.java | 5 +- .../CustomCellDateTieredCompactionPolicy.java | 67 +++++---- .../compactions/CustomCellTieredCompactor.java | 80 ++++++++++- .../compactions/DateTieredCompactor.java | 16 ++- .../compactions/TestCustomCellTieredCompactor.java | 150 +++++++++++++++++++++ 10 files changed, 396 insertions(+), 39 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java index eb9a7f3eccc..811710991eb 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java @@ -36,4 +36,6 @@ public final class TagType { // String based tag type used in replication public static final byte STRING_VIS_TAG_TYPE = (byte) 7; public static final byte TTL_TAG_TYPE = (byte) 8; + //tag with the custom cell tiering value for the row + public static final byte CELL_VALUE_TIERING_TAG_TYPE = (byte) 9; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index 96bfe42f1fd..8d58032799d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -29,6 +29,8 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.function.Function; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -127,6 +129,12 @@ public class HFileWriterImpl implements HFile.Writer { /** Cache configuration for caching data on write. */ protected final CacheConfig cacheConf; + public void setTimeRangeToTrack(Supplier<TimeRangeTracker> timeRangeToTrack) { + this.timeRangeToTrack = timeRangeToTrack; + } + + private Supplier<TimeRangeTracker> timeRangeToTrack; + /** * Name for this object used when logging or in toString. Is either the result of a toString on * stream or else name of passed file Path. @@ -186,7 +194,9 @@ public class HFileWriterImpl implements HFile.Writer { this.path = path; this.name = path != null ? path.getName() : outputStream.toString(); this.hFileContext = fileContext; + //TODO: Move this back to upper layer this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); + this.timeRangeToTrack = () -> this.timeRangeTracker; DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); if (encoding != DataBlockEncoding.NONE) { this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); @@ -588,7 +598,7 @@ public class HFileWriterImpl implements HFile.Writer { } private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) { - Optional<Boolean> result = cache.shouldCacheBlock(key, timeRangeTracker, conf); + Optional<Boolean> result = cache.shouldCacheBlock(key, timeRangeToTrack.get(), conf); return result.orElse(true); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomCellTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomCellTieredStoreEngine.java index fb940f41155..4f061736612 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomCellTieredStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomCellTieredStoreEngine.java @@ -19,11 +19,19 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine; +import org.apache.hadoop.hbase.regionserver.DefaultStoreFileManager; +import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFileComparators; import org.apache.hadoop.hbase.regionserver.compactions.CustomCellDateTieredCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieredCompactor; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor; +import org.apache.yetus.audience.InterfaceAudience; import java.io.IOException; import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY; +import static org.apache.hadoop.hbase.regionserver.compactions.CustomCellDateTieredCompactionPolicy.TIERING_CELL_QUALIFIER; /** * Extension of {@link DateTieredStoreEngine} that uses a pluggable value provider for @@ -41,6 +49,7 @@ public class CustomCellTieredStoreEngine extends DateTieredStoreEngine { protected void createComponents(Configuration conf, HStore store, CellComparator kvComparator) throws IOException { conf = new Configuration(conf); + conf.set(TIERING_CELL_QUALIFIER, store.conf.get(TIERING_CELL_QUALIFIER)); conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, CustomCellDateTieredCompactionPolicy.class.getName()); createCompactionPolicy(conf, store); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java new file mode 100644 index 00000000000..a09c45f837e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java @@ -0,0 +1,73 @@ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.function.Function; + [email protected] +public class CustomTieringMultiFileWriter extends DateTieredMultiFileWriter { + + public static final byte[] TIERING_CELL_TIME_RANGE = + Bytes.toBytes("TIERING_CELL_TIME_RANGE"); + + private NavigableMap<Long, TimeRangeTracker> lowerBoundary2TimeRanger = new TreeMap<>(); + + public CustomTieringMultiFileWriter(List<Long> lowerBoundaries, + Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile, + Function<ExtendedCell, Long> tieringFunction) { + super(lowerBoundaries, lowerBoundariesPolicies, needEmptyFile, tieringFunction); + for (Long lowerBoundary : lowerBoundaries) { + lowerBoundary2TimeRanger.put(lowerBoundary, null); + } + } + + @Override + public void append(ExtendedCell cell) throws IOException { + super.append(cell); + long tieringValue = tieringFunction.apply(cell); + Map.Entry<Long, TimeRangeTracker> entry = + lowerBoundary2TimeRanger.floorEntry(tieringValue); + if(entry.getValue()==null) { + TimeRangeTracker timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); + timeRangeTracker.setMin(tieringValue); + timeRangeTracker.setMax(tieringValue); + lowerBoundary2TimeRanger.put(entry.getKey(), timeRangeTracker); + ((HFileWriterImpl)lowerBoundary2Writer.get(entry.getKey()).getLiveFileWriter()) + .setTimeRangeToTrack(()->timeRangeTracker); + } else { + TimeRangeTracker timeRangeTracker = entry.getValue(); + if(timeRangeTracker.getMin() > tieringValue) { + timeRangeTracker.setMin(tieringValue); + } + if(timeRangeTracker.getMax() < tieringValue) { + timeRangeTracker.setMax(tieringValue); + } + } + } + + @Override + public List<Path> commitWriters(long maxSeqId, boolean majorCompaction, + Collection<HStoreFile> storeFiles) throws IOException { + for(Map.Entry<Long, StoreFileWriter> entry : this.lowerBoundary2Writer.entrySet()){ + StoreFileWriter writer = entry.getValue(); + if(writer!=null) { + writer.appendFileInfo(TIERING_CELL_TIME_RANGE, + TimeRangeTracker.toByteArray(lowerBoundary2TimeRanger.get(entry.getKey()))); + } + } + return super.commitWriters(maxSeqId, majorCompaction, storeFiles); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java index b800178e8a2..5796d7c890e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; import org.apache.hadoop.hbase.ExtendedCell; +import java.util.function.Function; +import org.apache.hadoop.hbase.Cell; import org.apache.yetus.audience.InterfaceAudience; /** @@ -33,12 +35,14 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { - private final NavigableMap<Long, StoreFileWriter> lowerBoundary2Writer = new TreeMap<>(); + protected final NavigableMap<Long, StoreFileWriter> lowerBoundary2Writer = new TreeMap<>(); private final boolean needEmptyFile; private final Map<Long, String> lowerBoundariesPolicies; + protected Function<ExtendedCell, Long> tieringFunction; + /** * @param lowerBoundariesPolicies each window to storage policy map. * @param needEmptyFile whether need to create an empty store file if we haven't written @@ -46,16 +50,29 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { */ public DateTieredMultiFileWriter(List<Long> lowerBoundaries, Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile) { + this(lowerBoundaries, lowerBoundariesPolicies, needEmptyFile, c -> c.getTimestamp()); + } + + /** + * @param lowerBoundariesPolicies each window to storage policy map. + * @param needEmptyFile whether need to create an empty store file if we haven't written + * out anything. + */ + public DateTieredMultiFileWriter(List<Long> lowerBoundaries, + Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile, + Function<ExtendedCell, Long> tieringFunction) { for (Long lowerBoundary : lowerBoundaries) { lowerBoundary2Writer.put(lowerBoundary, null); } this.needEmptyFile = needEmptyFile; this.lowerBoundariesPolicies = lowerBoundariesPolicies; + this.tieringFunction = tieringFunction; } @Override public void append(ExtendedCell cell) throws IOException { - Map.Entry<Long, StoreFileWriter> entry = lowerBoundary2Writer.floorEntry(cell.getTimestamp()); + Map.Entry<Long, StoreFileWriter> entry = + lowerBoundary2Writer.floorEntry(tieringFunction.apply(cell)); StoreFileWriter writer = entry.getValue(); if (writer == null) { String lowerBoundaryStoragePolicy = lowerBoundariesPolicies.get(entry.getKey()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 95a123c0a86..2e679e1a84d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -403,8 +403,9 @@ public abstract class Compactor<T extends CellSink> { protected abstract void abortWriter(T writer) throws IOException; - protected void decorateCells(List<ExtendedCell> cells) { + protected List<ExtendedCell> decorateCells(List<ExtendedCell> cells) { //no op + return cells; } /** @@ -463,7 +464,7 @@ public abstract class Compactor<T extends CellSink> { // output to writer: Cell lastCleanCell = null; long lastCleanCellSeqId = 0; - decorateCells(cells); + cells = decorateCells(cells); for (ExtendedCell c : cells) { if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { lastCleanCell = c; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellDateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellDateTieredCompactionPolicy.java index 6421147c6c0..3a5a0834e87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellDateTieredCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellDateTieredCompactionPolicy.java @@ -1,17 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hbase.regionserver.compactions; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; +import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.TIERING_CELL_TIME_RANGE; /** * Custom implementation of DateTieredCompactionPolicy that calculates compaction boundaries based @@ -30,9 +47,7 @@ public class CustomCellDateTieredCompactionPolicy extends DateTieredCompactionPo public static final String AGE_LIMIT_MILLIS = "hbase.hstore.compaction.date.tiered.custom.age.limit.millis"; - public static final String TIERING_CELL_MIN = "TIERING_CELL_MIN"; - - public static final String TIERING_CELL_MAX = "TIERING_CELL_MAX"; + public static final String TIERING_CELL_QUALIFIER = "TIERING_CELL_QUALIFIER"; private long cutOffTimestamp; @@ -40,7 +55,7 @@ public class CustomCellDateTieredCompactionPolicy extends DateTieredCompactionPo StoreConfigInformation storeConfigInfo) throws IOException { super(conf, storeConfigInfo); cutOffTimestamp = EnvironmentEdgeManager.currentTime() - - conf.getLong(AGE_LIMIT_MILLIS, (long) (10*365.25*24*60*60*1000)); + conf.getLong(AGE_LIMIT_MILLIS, (long) (10L*365.25*24L*60L*60L*1000L)); } @@ -49,35 +64,35 @@ public class CustomCellDateTieredCompactionPolicy extends DateTieredCompactionPo MutableLong min = new MutableLong(Long.MAX_VALUE); MutableLong max = new MutableLong(0); filesToCompact.forEach(f -> { - byte[] fileMin = f.getMetadataValue(Bytes.toBytes(TIERING_CELL_MIN)); - byte[] fileMax = f.getMetadataValue(Bytes.toBytes(TIERING_CELL_MAX)); - if (fileMin != null) { - long minCurrent = Bytes.toLong(fileMin); - if(min.getValue() < minCurrent) { - min.setValue(minCurrent); - } - } else { - min.setValue(0); - } - if (fileMax != null) { - long maxCurrent = Bytes.toLong(fileMax); - if(max.getValue() > maxCurrent) { - max.setValue(maxCurrent); - } - } else { - max.setValue(Long.MAX_VALUE); + byte[] timeRangeBytes = f.getMetadataValue(TIERING_CELL_TIME_RANGE); + long minCurrent = Long.MAX_VALUE; + long maxCurrent = 0; + if(timeRangeBytes!=null) { + try { + TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(timeRangeBytes); + timeRangeTracker.getMin(); + minCurrent = timeRangeTracker.getMin(); + maxCurrent = timeRangeTracker.getMax(); + } catch (IOException e) { + //TODO debug this } - }); + } + if(minCurrent < min.getValue()) { + min.setValue(minCurrent); + } + if(maxCurrent > max.getValue()) { + max.setValue(maxCurrent); + } + }); List<Long> boundaries = new ArrayList<>(); + boundaries.add(Long.MIN_VALUE); if (min.getValue() < cutOffTimestamp) { boundaries.add(min.getValue()); if (max.getValue() > cutOffTimestamp) { boundaries.add(cutOffTimestamp); } } - boundaries.add(Long.MIN_VALUE); - Collections.reverse(boundaries); return boundaries; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredCompactor.java index 8e1afee52e4..21b98b5611b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredCompactor.java @@ -1,13 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hbase.regionserver.compactions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.ExtendedCell; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter; +import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter; import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Optional; +import static org.apache.hadoop.hbase.regionserver.compactions.CustomCellDateTieredCompactionPolicy.TIERING_CELL_QUALIFIER; /** * An extension of DateTieredCompactor, overriding the decorateCells method to allow for custom @@ -15,13 +42,60 @@ import java.util.List; */ @InterfaceAudience.Private public class CustomCellTieredCompactor extends DateTieredCompactor { + + private byte[] tieringQualifier; + public CustomCellTieredCompactor(Configuration conf, HStore store) { super(conf, store); + tieringQualifier = Bytes.toBytes(conf.get(TIERING_CELL_QUALIFIER)); + } + + @Override + protected List<ExtendedCell> decorateCells(List<ExtendedCell> cells) { + //if no tiering qualifier properly set, skips the whole flow + if(tieringQualifier!=null) { + byte[] tieringValue = null; + //first iterates through the cells within a row, to find the tiering value for the row + for (Cell cell : cells) { + byte[] qualifier = new byte[cell.getQualifierLength()]; + System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(), qualifier, 0, cell.getQualifierLength()); + if (Arrays.equals(qualifier, tieringQualifier)) { + tieringValue = new byte[cell.getValueLength()]; + System.arraycopy(cell.getValueArray(), cell.getValueOffset(), tieringValue, 0, cell.getValueLength()); + break; + } + } + if(tieringValue==null){ + tieringValue = Bytes.toBytes(Long.MAX_VALUE); + } + //now apply the tiering value as a tag to all cells within the row + Tag tieringValueTag = new ArrayBackedTag(TagType.CELL_VALUE_TIERING_TAG_TYPE, tieringValue); + List<ExtendedCell> newCells = new ArrayList<>(cells.size()); + for(ExtendedCell cell : cells) { + List<Tag> tags = PrivateCellUtil.getTags(cell); + tags.add(tieringValueTag); + newCells.add(PrivateCellUtil.createCell(cell, tags)); + } + return newCells; + } else { + return cells; + } + } + + private long getTieringValue(ExtendedCell cell) { + Optional<Tag> tagOptional = PrivateCellUtil.getTag(cell, TagType.CELL_VALUE_TIERING_TAG_TYPE); + if(tagOptional.isPresent()) { + Tag tag = tagOptional.get(); + return Bytes.toLong(tag.getValueByteBuffer().array(), tag.getValueOffset(), tag.getValueLength()); + } + return Long.MAX_VALUE; } @Override - protected void decorateCells(List<ExtendedCell> cells) { - //TODO + protected DateTieredMultiFileWriter createMultiWriter(final CompactionRequestImpl request, + final List<Long> lowerBoundaries, final Map<Long, String> lowerBoundariesPolicies) { + return new CustomTieringMultiFileWriter(lowerBoundaries, lowerBoundariesPolicies, + needEmptyFile(request), CustomCellTieredCompactor.this::getTieringValue); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java index b5911b0cec4..5dacf63ab6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java @@ -24,6 +24,7 @@ import java.util.OptionalLong; import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; @@ -46,7 +47,7 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered super(conf, store); } - private boolean needEmptyFile(CompactionRequestImpl request) { + protected boolean needEmptyFile(CompactionRequestImpl request) { // if we are going to compact the last N files, then we need to emit an empty file to retain the // maxSeqId if we haven't written out anything. OptionalLong maxSeqId = StoreUtils.getMaxSequenceIdInList(request.getFiles()); @@ -68,16 +69,21 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered @Override public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd, - boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker) - throws IOException { - DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries, - lowerBoundariesPolicies, needEmptyFile(request)); + boolean shouldDropBehind, boolean major, Consumer<Path> writerCreationTracker) throws IOException { + DateTieredMultiFileWriter writer = + createMultiWriter(request, lowerBoundaries, lowerBoundariesPolicies); initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker); return writer; } }, throughputController, user); } + protected DateTieredMultiFileWriter createMultiWriter(final CompactionRequestImpl request, + final List<Long> lowerBoundaries, final Map<Long, String> lowerBoundariesPolicies) { + return new DateTieredMultiFileWriter(lowerBoundaries, + lowerBoundariesPolicies, needEmptyFile(request), c -> c.getTimestamp()); + } + @Override protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java new file mode 100644 index 00000000000..f76b6857a41 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.hadoop.hbase.HBaseClassTestRule; + +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.CustomCellTieredStoreEngine; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.TIERING_CELL_TIME_RANGE; +import static org.apache.hadoop.hbase.regionserver.compactions.CustomCellDateTieredCompactionPolicy.TIERING_CELL_QUALIFIER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestCustomCellTieredCompactor { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCustomCellTieredCompactor.class); + + public static final byte[] FAMILY = Bytes.toBytes("cf"); + + protected HBaseTestingUtil utility; + + protected Admin admin; + + @Before + public void setUp() throws Exception { + utility = new HBaseTestingUtil(); + utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 10); + utility.startMiniCluster(); + } + + @After + public void tearDown() throws Exception { + utility.shutdownMiniCluster(); + } + + @Test + public void testCustomCellTieredCompactor() throws Exception { + ColumnFamilyDescriptorBuilder clmBuilder = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY); + clmBuilder.setValue("hbase.hstore.engine.class", CustomCellTieredStoreEngine.class.getName()); + clmBuilder.setValue(TIERING_CELL_QUALIFIER, "date"); + TableName tableName = TableName.valueOf("testCustomCellTieredCompactor"); + TableDescriptorBuilder tblBuilder = TableDescriptorBuilder.newBuilder(tableName); + tblBuilder.setColumnFamily(clmBuilder.build()); + utility.getAdmin().createTable(tblBuilder.build()); + utility.waitTableAvailable(tableName); + Connection connection = utility.getConnection(); + Table table = connection.getTable(tableName); + long recordTime = System.currentTimeMillis(); + // write data and flush multiple store files: + for (int i = 0; i < 6; i++) { + List<Put> puts = new ArrayList<>(2); + Put put = new Put(Bytes.toBytes(i)); + put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + i)); + put.addColumn(FAMILY, Bytes.toBytes("date"), + Bytes.toBytes(recordTime - (11L*366L*24L*60L*60L*1000L))); + puts.add(put); + put = new Put(Bytes.toBytes(i+1000)); + put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + (i + 1000))); + put.addColumn(FAMILY, Bytes.toBytes("date"), + Bytes.toBytes(recordTime)); + puts.add(put); + table.put(puts); + utility.flush(tableName); + } + table.close(); + long firstCompactionTime = System.currentTimeMillis(); + utility.getAdmin().majorCompact(tableName); + Waiter.waitFor(utility.getConfiguration(), 5000, + () -> utility.getMiniHBaseCluster().getMaster() + .getLastMajorCompactionTimestamp(tableName) > firstCompactionTime); + long numHFiles = utility.getNumHFiles(tableName, FAMILY); + //The first major compaction would have no means to detect more than one tier, + // because without the min/max values available in the file info portion of the selected files + // for compaction, CustomCellDateTieredCompactionPolicy has no means + // to calculate the proper boundaries. + assertEquals(1, numHFiles); + utility.getMiniHBaseCluster().getRegions(tableName).get(0).getStore(FAMILY).getStorefiles().forEach( + file -> { + byte[] rangeBytes = file.getMetadataValue(TIERING_CELL_TIME_RANGE); + assertNotNull(rangeBytes); + try { + TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes); + assertEquals((recordTime - (11L*366L*24L*60L*60L*1000L)), timeRangeTracker.getMin()); + assertEquals(recordTime, timeRangeTracker.getMax()); + } catch (IOException e) { + fail(e.getMessage()); + } + } + ); + //now do major compaction again, to make sure we write two separate files + long secondCompactionTime = System.currentTimeMillis(); + utility.getAdmin().majorCompact(tableName); + Waiter.waitFor(utility.getConfiguration(), 5000, + () -> utility.getMiniHBaseCluster().getMaster() + .getLastMajorCompactionTimestamp(tableName) > secondCompactionTime); + numHFiles = utility.getNumHFiles(tableName, FAMILY); + assertEquals(2, numHFiles); + utility.getMiniHBaseCluster().getRegions(tableName).get(0).getStore(FAMILY).getStorefiles().forEach( + file -> { + byte[] rangeBytes = file.getMetadataValue(TIERING_CELL_TIME_RANGE); + assertNotNull(rangeBytes); + try { + TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes); + assertEquals(timeRangeTracker.getMin(), timeRangeTracker.getMax()); + } catch (IOException e) { + fail(e.getMessage()); + } + } + ); + } +}
