This is an automated email from the ASF dual-hosted git repository.

niuyulin pushed a commit to branch HBASE-25714
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-25714 by this push:
     new 85f0291  HBASE-26043 CompactionServer support compact TTL data (#3494)
85f0291 is described below

commit 85f02919da5498399ab727b6273e7b969dde9ad9
Author: niuyulin <[email protected]>
AuthorDate: Mon Jul 26 17:14:38 2021 +0800

    HBASE-26043 CompactionServer support compact TTL data (#3494)
    
    Signed-off-by: Duo Zhang <[email protected]>
---
 .../compactionserver/CompactionThreadManager.java  |  6 ++-
 .../apache/hadoop/hbase/regionserver/HStore.java   |  8 ++--
 .../compactionserver/TestCompactionServer.java     | 46 ++++++++++++++++++++++
 3 files changed, 55 insertions(+), 5 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
index bd78cc4..2eb697d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
@@ -99,7 +99,8 @@ public class CompactionThreadManager implements 
ThroughputControllerService {
     this.server = server;
     try {
       this.rootDir = CommonFSUtils.getRootDir(this.conf);
-      this.tableDescriptors = new FSTableDescriptors(conf);
+      this.tableDescriptors = new 
FSTableDescriptors(CommonFSUtils.getCurrentFileSystem(conf),
+          CommonFSUtils.getRootDir(conf), true, false);
       int largeThreads =
           Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, 
LARGE_COMPACTION_THREADS_DEFAULT));
       int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, 
SMALL_COMPACTION_THREADS_DEFAULT);
@@ -213,7 +214,8 @@ public class CompactionThreadManager implements 
ThroughputControllerService {
     tableDescriptors.get(regionInfo.getTable());
     HStore store = getStore(conf, server.getFileSystem(), rootDir,
       tableDescriptors.get(regionInfo.getTable()), regionInfo, 
cfd.getNameAsString());
-
+    // handle TTL case
+    store.removeUnneededFiles(false);
     // CompactedHFilesDischarger only run on regionserver, so compactionserver 
does not have
     // opportunity to clean compacted file at that time, we clean compacted 
files here
     compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 5cd005a..0569ad3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1928,7 +1928,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
       return Optional.empty();
     }
     // Before we do compaction, try to get rid of unneeded files to simplify 
things.
-    removeUnneededFiles();
+    removeUnneededFiles(true);
 
     if (region.getRegionServerServices() != null
         && region.getRegionServerServices().isCompactionOffloadEnabled()
@@ -2063,7 +2063,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
     Collections.sort(filesCompacting, 
storeEngine.getStoreFileManager().getStoreFileComparator());
   }
 
-  private void removeUnneededFiles() throws IOException {
+  public void removeUnneededFiles(boolean writeWalRecord) throws IOException {
     if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) {
       return;
     }
@@ -2092,7 +2092,9 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
     }
 
     Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files.
-    writeCompactionWalRecord(delSfs, newFiles);
+    if (writeWalRecord) {
+      writeCompactionWalRecord(delSfs, newFiles);
+    }
     replaceStoreFiles(delSfs, newFiles);
     refreshStoreSizeAndTotalBytes();
     LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) 
file(s) in "
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
index 6d0cccc..18b07d9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java
@@ -304,4 +304,50 @@ public class TestCompactionServer {
     TEST_UTIL.compact(TABLENAME, false);
     TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
   }
+
+  @Test
+  public void testCompactionWithTTL() throws Exception {
+    TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
+    ColumnFamilyDescriptor cfd =
+      
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).setTimeToLive(10).build();
+    TableDescriptor modifiedTableDescriptor = 
TableDescriptorBuilder.newBuilder(TABLENAME)
+      .setColumnFamily(cfd).setCompactionOffloadEnabled(true).build();
+    TEST_UTIL.getAdmin().modifyTable(modifiedTableDescriptor);
+    TEST_UTIL.waitTableAvailable(TABLENAME);
+    // generate hfile all kv are expired
+    doPutRecord(1, 500, true);
+    int kVCount = 0;
+    for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
+      for (HStoreFile hStoreFile : 
region.getStore(Bytes.toBytes(FAMILY)).getStorefiles()) {
+        kVCount += 
hStoreFile.getReader().getHFileReader().getTrailer().getEntryCount();
+      }
+    }
+    assertEquals(500, kVCount);
+    // generate hfile mixed with expired and valid KV
+    doPutRecord(1, 500, false);
+    Thread.sleep(10000);
+    doPutRecord(501, 1000, true);
+
+    TEST_UTIL.getAdmin().compactionSwitch(true, new ArrayList<>());
+    TEST_UTIL.compact(TABLENAME, true);
+    TEST_UTIL.waitFor(60000, () -> {
+      int hFileCount = 0;
+      for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) 
{
+        hFileCount += 
region.getStore(Bytes.toBytes(FAMILY)).getStorefilesCount();
+      }
+      return hFileCount == 1;
+    });
+    kVCount = 0;
+    for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
+      for (HStoreFile hStoreFile : 
region.getStore(Bytes.toBytes(FAMILY)).getStorefiles()) {
+        kVCount += 
hStoreFile.getReader().getHFileReader().getTrailer().getEntryCount();
+      }
+    }
+    // To ensure do compaction on compaction server
+    TEST_UTIL.waitFor(60000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
+
+    assertEquals(500, kVCount);
+    verifyRecord(1,500, false);
+    verifyRecord(501,1000, true);
+  }
 }

Reply via email to