This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 5214369  HIVE-25883: Enhance Compaction Cleaner to skip when there is 
nothing to do (#2958) (Zoltan Haindrich reviewed by Denys Kuzmenko)
5214369 is described below

commit 5214369d05ac02f36b6723c336cbdb953ea3c61c
Author: Zoltan Haindrich <k...@rxd.hu>
AuthorDate: Sun Jan 23 08:49:30 2022 +0100

    HIVE-25883: Enhance Compaction Cleaner to skip when there is nothing to do 
(#2958) (Zoltan Haindrich reviewed by Denys Kuzmenko)
---
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      |  57 +++++++---
 .../hadoop/hive/ql/txn/compactor/TestCleaner.java  | 116 +++++++++++++++++++--
 2 files changed, 154 insertions(+), 19 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index db65250..8d724f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
@@ -52,6 +51,7 @@ import 
org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
 import 
org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -60,6 +60,8 @@ import org.apache.hadoop.hive.common.StringableMap;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedBaseLight;
+import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hive.common.util.Ref;
@@ -208,7 +210,7 @@ public class Cleaner extends MetaStoreCompactorThread {
       }
       Optional<String> location = 
Optional.ofNullable(ci.properties).map(StringableMap::new)
           .map(config -> config.get("location"));
-      
+
       Callable<Boolean> cleanUpTask;
       Table t = null;
       Partition p = resolvePartition(ci);
@@ -248,12 +250,12 @@ public class Cleaner extends MetaStoreCompactorThread {
 
       if (t != null) {
         StorageDescriptor sd = resolveStorageDescriptor(t, p);
-        cleanUpTask = () -> removeFiles(location.orElse(sd.getLocation()), 
minOpenTxnGLB, ci, 
+        cleanUpTask = () -> removeFiles(location.orElse(sd.getLocation()), 
minOpenTxnGLB, ci,
             ci.partName != null && p == null);
       } else {
         cleanUpTask = () -> removeFiles(location.get(), ci);
       }
-      
+
       Ref<Boolean> removedFiles = Ref.from(false);
       if (runJobAsSelf(ci.runAs)) {
         removedFiles.value = cleanUpTask.call();
@@ -328,7 +330,7 @@ public class Cleaner extends MetaStoreCompactorThread {
     if (dropPartition) {
       LockRequest lockRequest = createLockRequest(ci, 0, LockType.EXCL_WRITE, 
DataOperationType.DELETE);
       LockResponse res = null;
-      
+
       try {
         res = txnHandler.lock(lockRequest);
         if (res.getState() == LockState.ACQUIRED) {
@@ -349,7 +351,7 @@ public class Cleaner extends MetaStoreCompactorThread {
         }
       }
     }
-      
+
     ValidTxnList validTxnList =
       TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), 
minOpenTxnGLB);
     //save it so that getAcidState() sees it
@@ -388,7 +390,7 @@ public class Cleaner extends MetaStoreCompactorThread {
     // Creating 'reader' list since we are interested in the set of 'obsolete' 
files
     ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, 
validTxnList);
     LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
-    
+
     return removeFiles(location, validWriteIdList, ci);
   }
   /**
@@ -419,6 +421,10 @@ public class Cleaner extends MetaStoreCompactorThread {
       // Including obsolete directories for partitioned tables can result in 
data loss.
       obsoleteDirs = dir.getAbortedDirectories();
     }
+    if (obsoleteDirs.isEmpty() && !hasDataBelowWatermark(fs, path, 
writeIdList.getHighWatermark())) {
+      LOG.info(idWatermark(ci) + " nothing to remove below watermark " + 
writeIdList.getHighWatermark() + ", ");
+      return true;
+    }
     StringBuilder extraDebugInfo = new 
StringBuilder("[").append(obsoleteDirs.stream()
         .map(Path::getName).collect(Collectors.joining(",")));
     boolean success = remove(location, ci, obsoleteDirs, true, fs, 
extraDebugInfo);
@@ -428,6 +434,33 @@ public class Cleaner extends MetaStoreCompactorThread {
     return success;
   }
 
+  private boolean hasDataBelowWatermark(FileSystem fs, Path path, long 
highWatermark) throws IOException {
+    FileStatus[] children = fs.listStatus(path);
+    for (FileStatus child : children) {
+      if (isFileBelowWatermark(child, highWatermark)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean isFileBelowWatermark(FileStatus child, long highWatermark) {
+    Path p = child.getPath();
+    String fn = p.getName();
+    if (!child.isDirectory()) {
+      return false;
+    }
+    if (fn.startsWith(AcidUtils.BASE_PREFIX)) {
+      ParsedBaseLight b = ParsedBaseLight.parseBase(p);
+      return b.getWriteId() < highWatermark;
+    }
+    if (fn.startsWith(AcidUtils.DELTA_PREFIX) || 
fn.startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
+      ParsedDeltaLight d = ParsedDeltaLight.parse(p);
+      return d.getMaxWriteId() < highWatermark;
+    }
+    return false;
+  }
+
   private boolean removeFiles(String location, CompactionInfo ci)
     throws NoSuchObjectException, IOException, MetaException {
     Path path = new Path(location);
@@ -439,11 +472,11 @@ public class Cleaner extends MetaStoreCompactorThread {
     return remove(location, ci, Collections.singletonList(path), ifPurge,
       path.getFileSystem(conf), extraDebugInfo);
   }
-  
-  private boolean remove(String location, CompactionInfo ci, List<Path> 
filesToDelete, boolean ifPurge, 
-      FileSystem fs, StringBuilder extraDebugInfo) 
+
+  private boolean remove(String location, CompactionInfo ci, List<Path> 
filesToDelete, boolean ifPurge,
+      FileSystem fs, StringBuilder extraDebugInfo)
       throws NoSuchObjectException, MetaException, IOException {
-    
+
     extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']');
     LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() +
          " obsolete directories from " + location + ". " + 
extraDebugInfo.toString());
@@ -454,7 +487,7 @@ public class Cleaner extends MetaStoreCompactorThread {
     }
     Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), 
ci.dbname);
     boolean needCmRecycle = ReplChangeManager.isSourceOfReplication(db);
-    
+
     for (Path dead : filesToDelete) {
       LOG.debug("Going to delete path " + dead.toString());
       if (needCmRecycle) {
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 42c5a04..f258005 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.GetPartitionRequest;
 import org.apache.hadoop.hive.metastore.api.GetTableRequest;
 import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
 import org.apache.hadoop.hive.metastore.api.FindNextCompactRequest;
@@ -210,9 +209,13 @@ public class TestCleaner extends CompactorTest {
     Assert.assertEquals(2, paths.size());
     boolean sawBase = false, sawDelta = false;
     for (Path p : paths) {
-      if (p.getName().equals("base_20")) sawBase = true;
-      else if (p.getName().equals(makeDeltaDirName(21, 24))) sawDelta = true;
-      else Assert.fail("Unexpected file " + p.getName());
+      if (p.getName().equals("base_20")) {
+        sawBase = true;
+      } else if (p.getName().equals(makeDeltaDirName(21, 24))) {
+        sawDelta = true;
+      } else {
+        Assert.fail("Unexpected file " + p.getName());
+      }
     }
     Assert.assertTrue(sawBase);
     Assert.assertTrue(sawDelta);
@@ -246,9 +249,13 @@ public class TestCleaner extends CompactorTest {
     Assert.assertEquals(2, paths.size());
     boolean sawBase = false, sawDelta = false;
     for (Path path : paths) {
-      if (path.getName().equals("base_20")) sawBase = true;
-      else if (path.getName().equals(makeDeltaDirNameCompacted(21, 24))) 
sawDelta = true;
-      else Assert.fail("Unexpected file " + path.getName());
+      if (path.getName().equals("base_20")) {
+        sawBase = true;
+      } else if (path.getName().equals(makeDeltaDirNameCompacted(21, 24))) {
+        sawDelta = true;
+      } else {
+        Assert.fail("Unexpected file " + path.getName());
+      }
     }
     Assert.assertTrue(sawBase);
     Assert.assertTrue(sawDelta);
@@ -722,4 +729,99 @@ public class TestCleaner extends CompactorTest {
     Assert.assertTrue(sawBase);
     Assert.assertTrue(sawDelta);
   }
+
+  @Test
+  public void withSingleBaseCleanerSucceeds() throws Exception {
+    Map<String, String> parameters = new HashMap<>();
+
+    Table t = newTable("default", "dcamc", false, parameters);
+
+    addBaseFile(t, null, 25L, 25);
+
+    burnThroughTransactions("default", "dcamc", 25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "dcamc", 
CompactionType.MAJOR);
+    compactInTxn(rqst);
+
+    startCleaner();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
+  }
+
+  @Test
+  public void withNewerBaseCleanerSucceeds() throws Exception {
+    Map<String, String> parameters = new HashMap<>();
+
+    Table t = newTable("default", "dcamc", false, parameters);
+
+    addBaseFile(t, null, 25L, 25);
+
+    burnThroughTransactions("default", "dcamc", 25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "dcamc", 
CompactionType.MAJOR);
+    compactInTxn(rqst);
+
+    burnThroughTransactions("default", "dcamc", 1);
+    addBaseFile(t, null, 26L, 26);
+
+    startCleaner();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
+
+    List<Path> paths = getDirectories(conf, t, null);
+    // we should retain both 25 and 26
+    Assert.assertEquals(2, paths.size());
+  }
+
+  @Test
+  public void withNotYetVisibleBase() throws Exception {
+
+    String dbName = "default";
+    String tableName = "camtc";
+    Table t = newTable(dbName, tableName, false);
+
+    addBaseFile(t, null, 20L, 20);
+    burnThroughTransactions(dbName, tableName, 25);
+
+    CompactionRequest rqst = new CompactionRequest(dbName, tableName, 
CompactionType.MAJOR);
+
+    long compactTxn = compactInTxn(rqst);
+    addBaseFile(t, null, 25L, 25, compactTxn);
+    startCleaner();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
+  }
+
+  @Test
+  public void cleanMultipleTimesWithSameWatermark() throws Exception {
+    String dbName = "default";
+    String tableName = "camtc";
+    Table t = newTable(dbName, tableName, false);
+
+    addBaseFile(t, null, 20L, 20);
+    addDeltaFile(t, null, 21L, 22L, 2);
+    burnThroughTransactions(dbName, tableName, 22);
+
+    CompactionRequest rqst = new CompactionRequest(dbName, tableName, 
CompactionType.MAJOR);
+    addBaseFile(t, null, 22L, 22);
+    compactInTxn(rqst);
+    compactInTxn(rqst);
+
+    startCleaner();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(2, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(1).getState());
+
+    List<Path> paths = getDirectories(conf, t, null);
+    Assert.assertEquals(1, paths.size());
+    Assert.assertEquals("base_22", paths.get(0).getName());
+  }
 }

Reply via email to