This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch HBASE-29427
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 8510a8986db5678f9ac5c09b6a6acbeb69878246
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Fri Nov 8 10:07:00 2024 +0000

    HBASE-29413 Implement a custom qualifier tiered compaction
    
    Change-Id: I6eff2199afbc8c02426b07b696c18af258bcb36c
---
 .../main/java/org/apache/hadoop/hbase/TagType.java |   2 +
 .../hadoop/hbase/io/hfile/HFileWriterImpl.java     |  12 +-
 .../regionserver/CustomCellTieredStoreEngine.java  |   9 ++
 .../regionserver/CustomTieringMultiFileWriter.java |  73 ++++++++++
 .../regionserver/DateTieredMultiFileWriter.java    |  21 ++-
 .../hbase/regionserver/compactions/Compactor.java  |   5 +-
 .../CustomCellDateTieredCompactionPolicy.java      |  67 +++++----
 .../compactions/CustomCellTieredCompactor.java     |  80 ++++++++++-
 .../compactions/DateTieredCompactor.java           |  16 ++-
 .../compactions/TestCustomCellTieredCompactor.java | 150 +++++++++++++++++++++
 10 files changed, 396 insertions(+), 39 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index eb9a7f3eccc..811710991eb 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -36,4 +36,6 @@ public final class TagType {
   // String based tag type used in replication
   public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
   public static final byte TTL_TAG_TYPE = (byte) 8;
+  //tag with the custom cell tiering value for the row
+  public static final byte CELL_VALUE_TIERING_TAG_TYPE = (byte) 9;
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
index 96bfe42f1fd..8d58032799d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
@@ -29,6 +29,8 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -127,6 +129,12 @@ public class HFileWriterImpl implements HFile.Writer {
   /** Cache configuration for caching data on write. */
   protected final CacheConfig cacheConf;
 
+  public void setTimeRangeToTrack(Supplier<TimeRangeTracker> timeRangeToTrack) 
{
+    this.timeRangeToTrack = timeRangeToTrack;
+  }
+
+  private Supplier<TimeRangeTracker> timeRangeToTrack;
+
   /**
    * Name for this object used when logging or in toString. Is either the 
result of a toString on
    * stream or else name of passed file Path.
@@ -186,7 +194,9 @@ public class HFileWriterImpl implements HFile.Writer {
     this.path = path;
     this.name = path != null ? path.getName() : outputStream.toString();
     this.hFileContext = fileContext;
+    //TODO: Move this back to upper layer
     this.timeRangeTracker = 
TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
+    this.timeRangeToTrack = () -> this.timeRangeTracker;
     DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
     if (encoding != DataBlockEncoding.NONE) {
       this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
@@ -588,7 +598,7 @@ public class HFileWriterImpl implements HFile.Writer {
   }
 
   private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) {
-    Optional<Boolean> result = cache.shouldCacheBlock(key, timeRangeTracker, 
conf);
+    Optional<Boolean> result = cache.shouldCacheBlock(key, 
timeRangeToTrack.get(), conf);
     return result.orElse(true);
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomCellTieredStoreEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomCellTieredStoreEngine.java
index fb940f41155..4f061736612 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomCellTieredStoreEngine.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomCellTieredStoreEngine.java
@@ -19,11 +19,19 @@ package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreFileManager;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFileComparators;
 import 
org.apache.hadoop.hbase.regionserver.compactions.CustomCellDateTieredCompactionPolicy;
 import 
org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieredCompactor;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor;
+import org.apache.yetus.audience.InterfaceAudience;
 import java.io.IOException;
 import static 
org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY;
+import static 
org.apache.hadoop.hbase.regionserver.compactions.CustomCellDateTieredCompactionPolicy.TIERING_CELL_QUALIFIER;
 
 /**
  * Extension of {@link DateTieredStoreEngine} that uses a pluggable value 
provider for
@@ -41,6 +49,7 @@ public class CustomCellTieredStoreEngine extends 
DateTieredStoreEngine {
   protected void createComponents(Configuration conf, HStore store, 
CellComparator kvComparator)
     throws IOException {
     conf = new Configuration(conf);
+    conf.set(TIERING_CELL_QUALIFIER, store.conf.get(TIERING_CELL_QUALIFIER));
     conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY,
       CustomCellDateTieredCompactionPolicy.class.getName());
     createCompactionPolicy(conf, store);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java
new file mode 100644
index 00000000000..a09c45f837e
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java
@@ -0,0 +1,73 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.function.Function;
+
[email protected]
+public class CustomTieringMultiFileWriter extends DateTieredMultiFileWriter {
+
+  public static final byte[] TIERING_CELL_TIME_RANGE =
+    Bytes.toBytes("TIERING_CELL_TIME_RANGE");
+
+  private NavigableMap<Long, TimeRangeTracker> lowerBoundary2TimeRanger = new 
TreeMap<>();
+
+  public CustomTieringMultiFileWriter(List<Long> lowerBoundaries,
+    Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile,
+    Function<ExtendedCell, Long> tieringFunction) {
+    super(lowerBoundaries, lowerBoundariesPolicies, needEmptyFile, 
tieringFunction);
+    for (Long lowerBoundary : lowerBoundaries) {
+      lowerBoundary2TimeRanger.put(lowerBoundary, null);
+    }
+  }
+
+  @Override
+  public void append(ExtendedCell cell) throws IOException {
+    super.append(cell);
+    long tieringValue = tieringFunction.apply(cell);
+    Map.Entry<Long, TimeRangeTracker> entry =
+      lowerBoundary2TimeRanger.floorEntry(tieringValue);
+    if(entry.getValue()==null) {
+      TimeRangeTracker timeRangeTracker = 
TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
+      timeRangeTracker.setMin(tieringValue);
+      timeRangeTracker.setMax(tieringValue);
+      lowerBoundary2TimeRanger.put(entry.getKey(), timeRangeTracker);
+      
((HFileWriterImpl)lowerBoundary2Writer.get(entry.getKey()).getLiveFileWriter())
+        .setTimeRangeToTrack(()->timeRangeTracker);
+    } else {
+      TimeRangeTracker timeRangeTracker = entry.getValue();
+      if(timeRangeTracker.getMin() > tieringValue) {
+        timeRangeTracker.setMin(tieringValue);
+      }
+      if(timeRangeTracker.getMax() < tieringValue) {
+        timeRangeTracker.setMax(tieringValue);
+      }
+    }
+  }
+
+  @Override
+  public List<Path> commitWriters(long maxSeqId, boolean majorCompaction,
+    Collection<HStoreFile> storeFiles) throws IOException {
+    for(Map.Entry<Long, StoreFileWriter> entry : 
this.lowerBoundary2Writer.entrySet()){
+      StoreFileWriter writer = entry.getValue();
+      if(writer!=null) {
+        writer.appendFileInfo(TIERING_CELL_TIME_RANGE,
+          
TimeRangeTracker.toByteArray(lowerBoundary2TimeRanger.get(entry.getKey())));
+      }
+    }
+    return super.commitWriters(maxSeqId, majorCompaction, storeFiles);
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
index b800178e8a2..5796d7c890e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import org.apache.hadoop.hbase.ExtendedCell;
+import java.util.function.Function;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -33,12 +35,14 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
 
-  private final NavigableMap<Long, StoreFileWriter> lowerBoundary2Writer = new 
TreeMap<>();
+  protected final NavigableMap<Long, StoreFileWriter> lowerBoundary2Writer = 
new TreeMap<>();
 
   private final boolean needEmptyFile;
 
   private final Map<Long, String> lowerBoundariesPolicies;
 
+  protected Function<ExtendedCell, Long> tieringFunction;
+
   /**
    * @param lowerBoundariesPolicies each window to storage policy map.
    * @param needEmptyFile           whether need to create an empty store file 
if we haven't written
@@ -46,16 +50,29 @@ public class DateTieredMultiFileWriter extends 
AbstractMultiFileWriter {
    */
   public DateTieredMultiFileWriter(List<Long> lowerBoundaries,
     Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile) {
+    this(lowerBoundaries, lowerBoundariesPolicies, needEmptyFile, c -> 
c.getTimestamp());
+  }
+
+  /**
+   * @param lowerBoundariesPolicies each window to storage policy map.
+   * @param needEmptyFile           whether need to create an empty store file 
if we haven't written
+   *                                out anything.
+   */
+  public DateTieredMultiFileWriter(List<Long> lowerBoundaries,
+    Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile,
+    Function<ExtendedCell, Long> tieringFunction) {
     for (Long lowerBoundary : lowerBoundaries) {
       lowerBoundary2Writer.put(lowerBoundary, null);
     }
     this.needEmptyFile = needEmptyFile;
     this.lowerBoundariesPolicies = lowerBoundariesPolicies;
+    this.tieringFunction = tieringFunction;
   }
 
   @Override
   public void append(ExtendedCell cell) throws IOException {
-    Map.Entry<Long, StoreFileWriter> entry = 
lowerBoundary2Writer.floorEntry(cell.getTimestamp());
+    Map.Entry<Long, StoreFileWriter> entry =
+      lowerBoundary2Writer.floorEntry(tieringFunction.apply(cell));
     StoreFileWriter writer = entry.getValue();
     if (writer == null) {
       String lowerBoundaryStoragePolicy = 
lowerBoundariesPolicies.get(entry.getKey());
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 95a123c0a86..2e679e1a84d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -403,8 +403,9 @@ public abstract class Compactor<T extends CellSink> {
 
   protected abstract void abortWriter(T writer) throws IOException;
 
-  protected void decorateCells(List<ExtendedCell> cells) {
+  protected List<ExtendedCell> decorateCells(List<ExtendedCell> cells) {
     //no op
+    return cells;
   }
 
   /**
@@ -463,7 +464,7 @@ public abstract class Compactor<T extends CellSink> {
         // output to writer:
         Cell lastCleanCell = null;
         long lastCleanCellSeqId = 0;
-        decorateCells(cells);
+        cells = decorateCells(cells);
         for (ExtendedCell c : cells) {
           if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
             lastCleanCell = c;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellDateTieredCompactionPolicy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellDateTieredCompactionPolicy.java
index 6421147c6c0..3a5a0834e87 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellDateTieredCompactionPolicy.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellDateTieredCompactionPolicy.java
@@ -1,17 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hbase.regionserver.compactions;
 
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
+import static 
org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.TIERING_CELL_TIME_RANGE;
 
 /**
  * Custom implementation of DateTieredCompactionPolicy that calculates 
compaction boundaries based
@@ -30,9 +47,7 @@ public class CustomCellDateTieredCompactionPolicy extends 
DateTieredCompactionPo
   public static final String AGE_LIMIT_MILLIS =
     "hbase.hstore.compaction.date.tiered.custom.age.limit.millis";
 
-  public static final String TIERING_CELL_MIN = "TIERING_CELL_MIN";
-
-  public static final String TIERING_CELL_MAX = "TIERING_CELL_MAX";
+  public static final String TIERING_CELL_QUALIFIER = "TIERING_CELL_QUALIFIER";
 
   private long cutOffTimestamp;
 
@@ -40,7 +55,7 @@ public class CustomCellDateTieredCompactionPolicy extends 
DateTieredCompactionPo
     StoreConfigInformation storeConfigInfo) throws IOException {
     super(conf, storeConfigInfo);
     cutOffTimestamp = EnvironmentEdgeManager.currentTime() -
-      conf.getLong(AGE_LIMIT_MILLIS, (long) (10*365.25*24*60*60*1000));
+      conf.getLong(AGE_LIMIT_MILLIS, (long) (10L*365.25*24L*60L*60L*1000L));
 
   }
 
@@ -49,35 +64,35 @@ public class CustomCellDateTieredCompactionPolicy extends 
DateTieredCompactionPo
     MutableLong min = new MutableLong(Long.MAX_VALUE);
     MutableLong max = new MutableLong(0);
     filesToCompact.forEach(f -> {
-        byte[] fileMin = f.getMetadataValue(Bytes.toBytes(TIERING_CELL_MIN));
-        byte[] fileMax = f.getMetadataValue(Bytes.toBytes(TIERING_CELL_MAX));
-        if (fileMin != null) {
-          long minCurrent = Bytes.toLong(fileMin);
-          if(min.getValue() < minCurrent) {
-            min.setValue(minCurrent);
-          }
-        } else {
-          min.setValue(0);
-        }
-        if (fileMax != null) {
-          long maxCurrent = Bytes.toLong(fileMax);
-          if(max.getValue() > maxCurrent) {
-            max.setValue(maxCurrent);
-          }
-        } else {
-          max.setValue(Long.MAX_VALUE);
+      byte[] timeRangeBytes = f.getMetadataValue(TIERING_CELL_TIME_RANGE);
+      long minCurrent = Long.MAX_VALUE;
+      long maxCurrent = 0;
+      if(timeRangeBytes!=null) {
+        try {
+          TimeRangeTracker timeRangeTracker = 
TimeRangeTracker.parseFrom(timeRangeBytes);
+          timeRangeTracker.getMin();
+          minCurrent = timeRangeTracker.getMin();
+          maxCurrent = timeRangeTracker.getMax();
+        } catch (IOException e) {
+              //TODO debug this
         }
-      });
+      }
+      if(minCurrent < min.getValue()) {
+        min.setValue(minCurrent);
+      }
+      if(maxCurrent > max.getValue()) {
+        max.setValue(maxCurrent);
+      }
+    });
 
     List<Long> boundaries = new ArrayList<>();
+    boundaries.add(Long.MIN_VALUE);
     if (min.getValue() < cutOffTimestamp) {
       boundaries.add(min.getValue());
       if (max.getValue() > cutOffTimestamp) {
         boundaries.add(cutOffTimestamp);
       }
     }
-    boundaries.add(Long.MIN_VALUE);
-    Collections.reverse(boundaries);
     return boundaries;
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredCompactor.java
index 8e1afee52e4..21b98b5611b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredCompactor.java
@@ -1,13 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hbase.regionserver.compactions;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ArrayBackedTag;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.ExtendedCell;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter;
+import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
 import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import static 
org.apache.hadoop.hbase.regionserver.compactions.CustomCellDateTieredCompactionPolicy.TIERING_CELL_QUALIFIER;
 
 /**
  * An extension of DateTieredCompactor, overriding the decorateCells method to 
allow for custom
@@ -15,13 +42,60 @@ import java.util.List;
  */
 @InterfaceAudience.Private
 public class CustomCellTieredCompactor extends DateTieredCompactor {
+
+  private byte[] tieringQualifier;
+
   public CustomCellTieredCompactor(Configuration conf, HStore store) {
     super(conf, store);
+    tieringQualifier = Bytes.toBytes(conf.get(TIERING_CELL_QUALIFIER));
+  }
+
+  @Override
+  protected List<ExtendedCell> decorateCells(List<ExtendedCell> cells) {
+    //if no tiering qualifier properly set, skips the whole flow
+    if(tieringQualifier!=null) {
+      byte[] tieringValue = null;
+      //first iterates through the cells within a row, to find the tiering 
value for the row
+      for (Cell cell : cells) {
+        byte[] qualifier = new byte[cell.getQualifierLength()];
+        System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(), 
qualifier, 0, cell.getQualifierLength());
+        if (Arrays.equals(qualifier, tieringQualifier)) {
+          tieringValue = new byte[cell.getValueLength()];
+          System.arraycopy(cell.getValueArray(), cell.getValueOffset(), 
tieringValue, 0, cell.getValueLength());
+          break;
+        }
+      }
+      if(tieringValue==null){
+        tieringValue = Bytes.toBytes(Long.MAX_VALUE);
+      }
+      //now apply the tiering value as a tag to all cells within the row
+      Tag tieringValueTag = new 
ArrayBackedTag(TagType.CELL_VALUE_TIERING_TAG_TYPE, tieringValue);
+      List<ExtendedCell> newCells = new ArrayList<>(cells.size());
+      for(ExtendedCell cell : cells) {
+        List<Tag> tags = PrivateCellUtil.getTags(cell);
+        tags.add(tieringValueTag);
+        newCells.add(PrivateCellUtil.createCell(cell, tags));
+      }
+      return newCells;
+    } else {
+      return cells;
+    }
+  }
+
+  private long getTieringValue(ExtendedCell cell) {
+    Optional<Tag> tagOptional = PrivateCellUtil.getTag(cell, 
TagType.CELL_VALUE_TIERING_TAG_TYPE);
+    if(tagOptional.isPresent()) {
+      Tag tag = tagOptional.get();
+      return  Bytes.toLong(tag.getValueByteBuffer().array(), 
tag.getValueOffset(), tag.getValueLength());
+    }
+    return Long.MAX_VALUE;
   }
 
   @Override
-  protected void decorateCells(List<ExtendedCell> cells) {
-    //TODO
+  protected DateTieredMultiFileWriter createMultiWriter(final 
CompactionRequestImpl request,
+    final List<Long> lowerBoundaries, final Map<Long, String> 
lowerBoundariesPolicies) {
+    return new CustomTieringMultiFileWriter(lowerBoundaries, 
lowerBoundariesPolicies,
+      needEmptyFile(request), CustomCellTieredCompactor.this::getTieringValue);
   }
 
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
index b5911b0cec4..5dacf63ab6c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
@@ -24,6 +24,7 @@ import java.util.OptionalLong;
 import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -46,7 +47,7 @@ public class DateTieredCompactor extends 
AbstractMultiOutputCompactor<DateTiered
     super(conf, store);
   }
 
-  private boolean needEmptyFile(CompactionRequestImpl request) {
+  protected boolean needEmptyFile(CompactionRequestImpl request) {
     // if we are going to compact the last N files, then we need to emit an 
empty file to retain the
     // maxSeqId if we haven't written out anything.
     OptionalLong maxSeqId = 
StoreUtils.getMaxSequenceIdInList(request.getFiles());
@@ -68,16 +69,21 @@ public class DateTieredCompactor extends 
AbstractMultiOutputCompactor<DateTiered
 
         @Override
         public DateTieredMultiFileWriter createWriter(InternalScanner scanner, 
FileDetails fd,
-          boolean shouldDropBehind, boolean major, Consumer<Path> 
writerCreationTracker)
-          throws IOException {
-          DateTieredMultiFileWriter writer = new 
DateTieredMultiFileWriter(lowerBoundaries,
-            lowerBoundariesPolicies, needEmptyFile(request));
+          boolean shouldDropBehind, boolean major, Consumer<Path> 
writerCreationTracker) throws IOException {
+          DateTieredMultiFileWriter writer =
+            createMultiWriter(request, lowerBoundaries, 
lowerBoundariesPolicies);
           initMultiWriter(writer, scanner, fd, shouldDropBehind, major, 
writerCreationTracker);
           return writer;
         }
       }, throughputController, user);
   }
 
+  protected DateTieredMultiFileWriter createMultiWriter(final 
CompactionRequestImpl request,
+    final List<Long> lowerBoundaries, final Map<Long, String> 
lowerBoundariesPolicies) {
+    return new DateTieredMultiFileWriter(lowerBoundaries,
+      lowerBoundariesPolicies, needEmptyFile(request), c -> c.getTimestamp());
+  }
+
   @Override
   protected List<Path> commitWriter(DateTieredMultiFileWriter writer, 
FileDetails fd,
     CompactionRequestImpl request) throws IOException {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java
new file mode 100644
index 00000000000..f76b6857a41
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.CustomCellTieredStoreEngine;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import static 
org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.TIERING_CELL_TIME_RANGE;
+import static 
org.apache.hadoop.hbase.regionserver.compactions.CustomCellDateTieredCompactionPolicy.TIERING_CELL_QUALIFIER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestCustomCellTieredCompactor {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestCustomCellTieredCompactor.class);
+
+  public static final byte[] FAMILY = Bytes.toBytes("cf");
+
+  protected HBaseTestingUtil utility;
+
+  protected Admin admin;
+
+  @Before
+  public void setUp() throws Exception {
+    utility = new HBaseTestingUtil();
+    
utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 
10);
+    utility.startMiniCluster();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    utility.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testCustomCellTieredCompactor() throws Exception {
+    ColumnFamilyDescriptorBuilder clmBuilder = 
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY);
+    clmBuilder.setValue("hbase.hstore.engine.class", 
CustomCellTieredStoreEngine.class.getName());
+    clmBuilder.setValue(TIERING_CELL_QUALIFIER, "date");
+    TableName tableName = TableName.valueOf("testCustomCellTieredCompactor");
+    TableDescriptorBuilder tblBuilder = 
TableDescriptorBuilder.newBuilder(tableName);
+    tblBuilder.setColumnFamily(clmBuilder.build());
+    utility.getAdmin().createTable(tblBuilder.build());
+    utility.waitTableAvailable(tableName);
+    Connection connection = utility.getConnection();
+    Table table = connection.getTable(tableName);
+    long recordTime = System.currentTimeMillis();
+    // write data and flush multiple store files:
+    for (int i = 0; i < 6; i++) {
+      List<Put> puts = new ArrayList<>(2);
+      Put put = new Put(Bytes.toBytes(i));
+      put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + i));
+      put.addColumn(FAMILY, Bytes.toBytes("date"),
+        Bytes.toBytes(recordTime - (11L*366L*24L*60L*60L*1000L)));
+      puts.add(put);
+      put = new Put(Bytes.toBytes(i+1000));
+      put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + (i + 
1000)));
+      put.addColumn(FAMILY, Bytes.toBytes("date"),
+        Bytes.toBytes(recordTime));
+      puts.add(put);
+      table.put(puts);
+      utility.flush(tableName);
+    }
+    table.close();
+    long firstCompactionTime = System.currentTimeMillis();
+    utility.getAdmin().majorCompact(tableName);
+    Waiter.waitFor(utility.getConfiguration(), 5000,
+      () -> utility.getMiniHBaseCluster().getMaster()
+        .getLastMajorCompactionTimestamp(tableName) > firstCompactionTime);
+    long numHFiles = utility.getNumHFiles(tableName, FAMILY);
+    //The first major compaction would have no means to detect more than one 
tier,
+    // because without the min/max values available in the file info portion 
of the selected files
+    // for compaction, CustomCellDateTieredCompactionPolicy has no means
+    // to calculate the proper boundaries.
+    assertEquals(1, numHFiles);
+    
utility.getMiniHBaseCluster().getRegions(tableName).get(0).getStore(FAMILY).getStorefiles().forEach(
+      file -> {
+        byte[] rangeBytes = file.getMetadataValue(TIERING_CELL_TIME_RANGE);
+        assertNotNull(rangeBytes);
+        try {
+          TimeRangeTracker timeRangeTracker = 
TimeRangeTracker.parseFrom(rangeBytes);
+          assertEquals((recordTime - (11L*366L*24L*60L*60L*1000L)), 
timeRangeTracker.getMin());
+          assertEquals(recordTime, timeRangeTracker.getMax());
+        } catch (IOException e) {
+          fail(e.getMessage());
+        }
+      }
+    );
+    //now do major compaction again, to make sure we write two separate files
+    long secondCompactionTime = System.currentTimeMillis();
+    utility.getAdmin().majorCompact(tableName);
+    Waiter.waitFor(utility.getConfiguration(), 5000,
+      () -> utility.getMiniHBaseCluster().getMaster()
+        .getLastMajorCompactionTimestamp(tableName) > secondCompactionTime);
+    numHFiles = utility.getNumHFiles(tableName, FAMILY);
+    assertEquals(2, numHFiles);
+    
utility.getMiniHBaseCluster().getRegions(tableName).get(0).getStore(FAMILY).getStorefiles().forEach(
+      file -> {
+        byte[] rangeBytes = file.getMetadataValue(TIERING_CELL_TIME_RANGE);
+        assertNotNull(rangeBytes);
+        try {
+          TimeRangeTracker timeRangeTracker = 
TimeRangeTracker.parseFrom(rangeBytes);
+          assertEquals(timeRangeTracker.getMin(), timeRangeTracker.getMax());
+        } catch (IOException e) {
+          fail(e.getMessage());
+        }
+      }
+    );
+  }
+}

Reply via email to