This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new d3542e1  HIVE-25977: Enhance Compaction Cleaner to skip when there is 
nothing to do #2 (#2971) (Zoltan Haindrich reviewed by Karen Coppage and Denys 
Kuzmenko)
d3542e1 is described below

commit d3542e1b35bbdbaafb52ad742a5168bd29549cee
Author: Zoltan Haindrich <k...@rxd.hu>
AuthorDate: Wed Mar 9 13:04:11 2022 +0100

    HIVE-25977: Enhance Compaction Cleaner to skip when there is nothing to do 
#2 (#2971) (Zoltan Haindrich reviewed by Karen Coppage and Denys Kuzmenko)
---
 .../txn/compactor/TestCleanerWithReplication.java  |  4 +-
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      | 38 ++++++---
 .../hive/ql/txn/compactor/CompactorTest.java       | 33 ++++++--
 .../hadoop/hive/ql/txn/compactor/TestCleaner.java  | 95 ++++++++++++++++++++++
 4 files changed, 148 insertions(+), 22 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
index 429d55c..6353b37 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
@@ -143,7 +143,7 @@ public class TestCleanerWithReplication extends 
CompactorTest {
     addDeltaFile(t, null, 23L, 24L, 2);
     addDeltaFile(t, null, 21L, 24L, 4);
 
-    burnThroughTransactions(dbName, "camitc", 25);
+    burnThroughTransactions(dbName, "camitc", 24);
 
     CompactionRequest rqst = new CompactionRequest(dbName, "camitc", 
CompactionType.MINOR);
     compactInTxn(rqst);
@@ -161,7 +161,7 @@ public class TestCleanerWithReplication extends 
CompactorTest {
     addDeltaFile(t, p, 23L, 24L, 2);
     addDeltaFile(t, p, 21L, 24L, 4);
 
-    burnThroughTransactions(dbName, "camipc", 25);
+    burnThroughTransactions(dbName, "camipc", 24);
 
     CompactionRequest rqst = new CompactionRequest(dbName, "camipc", 
CompactionType.MINOR);
     rqst.setPartitionname("ds=today");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 1e0dbf8..55a7802 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
@@ -62,6 +61,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedBaseLight;
+import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
 import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
@@ -71,14 +71,15 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.Callable;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.hive.conf.Constants.COMPACTOR_CLEANER_THREAD_NAME_FORMAT;
@@ -87,8 +88,6 @@ import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAY
 import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
 import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
 
-import com.codahale.metrics.Counter;
-
 /**
  * A class to clean directories after compactions.  This will run in a 
separate thread.
  */
@@ -425,8 +424,10 @@ public class Cleaner extends MetaStoreCompactorThread {
       // Including obsolete directories for partitioned tables can result in 
data loss.
       obsoleteDirs = dir.getAbortedDirectories();
     }
-    if (obsoleteDirs.isEmpty() && !hasDataBelowWatermark(fs, path, 
writeIdList.getHighWatermark())) {
-      LOG.info(idWatermark(ci) + " nothing to remove below watermark " + 
writeIdList.getHighWatermark() + ", ");
+
+    if (obsoleteDirs.isEmpty()
+        && !hasDataBelowWatermark(dir, fs, path, ci.highestWriteId, 
writeIdList.getHighWatermark())) {
+      LOG.info(idWatermark(ci) + " nothing to remove below watermark " + 
ci.highestWriteId + ", ");
       return true;
     }
     StringBuilder extraDebugInfo = new 
StringBuilder("[").append(obsoleteDirs.stream()
@@ -439,29 +440,40 @@ public class Cleaner extends MetaStoreCompactorThread {
     return success;
   }
 
-  private boolean hasDataBelowWatermark(FileSystem fs, Path path, long 
highWatermark) throws IOException {
-    FileStatus[] children = fs.listStatus(path);
+  private boolean hasDataBelowWatermark(AcidDirectory acidDir, FileSystem fs, 
Path path, long highWatermark,
+      long minOpenTxn)
+      throws IOException {
+    Set<Path> acidPaths = new HashSet<>();
+    for (ParsedDelta delta : acidDir.getCurrentDirectories()) {
+      acidPaths.add(delta.getPath());
+    }
+    if (acidDir.getBaseDirectory() != null) {
+      acidPaths.add(acidDir.getBaseDirectory());
+    }
+    FileStatus[] children = fs.listStatus(path, p -> {
+      return !acidPaths.contains(p);
+    });
     for (FileStatus child : children) {
-      if (isFileBelowWatermark(child, highWatermark)) {
+      if (isFileBelowWatermark(child, highWatermark, minOpenTxn)) {
         return true;
       }
     }
     return false;
   }
 
-  private boolean isFileBelowWatermark(FileStatus child, long highWatermark) {
+  private boolean isFileBelowWatermark(FileStatus child, long highWatermark, 
long minOpenTxn) {
     Path p = child.getPath();
     String fn = p.getName();
     if (!child.isDirectory()) {
-      return false;
+      return true;
     }
     if (fn.startsWith(AcidUtils.BASE_PREFIX)) {
       ParsedBaseLight b = ParsedBaseLight.parseBase(p);
-      return b.getWriteId() < highWatermark;
+      return b.getWriteId() <= highWatermark;
     }
     if (fn.startsWith(AcidUtils.DELTA_PREFIX) || 
fn.startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
       ParsedDeltaLight d = ParsedDeltaLight.parse(p);
-      return d.getMaxWriteId() < highWatermark;
+      return d.getMaxWriteId() <= highWatermark;
     }
     return false;
   }
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index f4ce773..e486497 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -197,7 +197,9 @@ public abstract class CompactorTest {
               
TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY);
     }
     table.setParameters(parameters);
-    if (isTemporary) table.setTemporary(true);
+    if (isTemporary) {
+      table.setTemporary(true);
+    }
 
     // drop the table first, in case some previous test created it
     ms.dropTable(dbName, tableName);
@@ -272,6 +274,11 @@ public abstract class CompactorTest {
   }
 
   protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn, 
int numRecords,
+      long visibilityId) throws Exception {
+    addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true, 
visibilityId);
+  }
+
+  protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn, 
int numRecords,
                               int numBuckets, boolean allBucketsPresent) 
throws Exception {
     addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, numBuckets, 
allBucketsPresent);
   }
@@ -288,7 +295,9 @@ public abstract class CompactorTest {
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stats = fs.listStatus(dir);
     List<Path> paths = new ArrayList<Path>(stats.length);
-    for (int i = 0; i < stats.length; i++) paths.add(stats[i].getPath());
+    for (int i = 0; i < stats.length; i++) {
+      paths.add(stats[i].getPath());
+    }
     return paths;
   }
 
@@ -398,7 +407,9 @@ public abstract class CompactorTest {
 
     FileSystem fs = FileSystem.get(conf);
     for (int bucket = 0; bucket < numBuckets; bucket++) {
-      if (bucket == 0 && !allBucketsPresent) continue; // skip one
+      if (bucket == 0 && !allBucketsPresent) {
+        continue; // skip one
+      }
       Path partFile = null;
       if (type == FileType.LEGACY) {
         partFile = new Path(location, 
String.format(AcidUtils.LEGACY_FILE_BUCKET_DIGITS, bucket) + "_0");
@@ -443,7 +454,9 @@ public abstract class CompactorTest {
         if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
           Path p = AcidUtils.createBucketFile(baseDirectory, bucket);
           FileSystem fs = p.getFileSystem(conf);
-          if (fs.exists(p)) filesToRead.add(p);
+          if (fs.exists(p)) {
+            filesToRead.add(p);
+          }
         } else {
           filesToRead.add(new Path(baseDirectory, "000000_0"));
 
@@ -452,7 +465,9 @@ public abstract class CompactorTest {
       for (int i = 0; i < deltaDirectory.length; i++) {
         Path p = AcidUtils.createBucketFile(deltaDirectory[i], bucket);
         FileSystem fs = p.getFileSystem(conf);
-        if (fs.exists(p)) filesToRead.add(p);
+        if (fs.exists(p)) {
+          filesToRead.add(p);
+        }
       }
       return new MockRawReader(conf, filesToRead);
     }
@@ -484,7 +499,9 @@ public abstract class CompactorTest {
 
     MockRawReader(Configuration conf, List<Path> files) throws IOException {
       filesToRead = new Stack<Path>();
-      for (Path file : files) filesToRead.push(file);
+      for (Path file : files) {
+        filesToRead.push(file);
+      }
       this.conf = conf;
       fs = FileSystem.get(conf);
     }
@@ -512,7 +529,9 @@ public abstract class CompactorTest {
     public boolean next(RecordIdentifier identifier, Text text) throws 
IOException {
       if (is == null) {
         // Open the next file
-        if (filesToRead.empty()) return false;
+        if (filesToRead.empty()) {
+          return false;
+        }
         Path p = filesToRead.pop();
         LOG.debug("Reading records from " + p.toString());
         is = fs.open(p);
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 1a09d50..2bb8ace 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -43,8 +43,10 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -852,4 +854,97 @@ public class TestCleaner extends CompactorTest {
     Assert.assertEquals(1, paths.size());
     Assert.assertEquals("base_22", paths.get(0).getName());
   }
+
+  @Test
+  public void nothingToCleanAfterAbortsBase() throws Exception {
+    String dbName = "default";
+    String tableName = "camtc";
+    Table t = newTable(dbName, tableName, false);
+
+    addBaseFile(t, null, 20L, 1);
+    addDeltaFile(t, null, 21L, 21L, 2);
+    addDeltaFile(t, null, 22L, 22L, 2);
+    burnThroughTransactions(dbName, tableName, 22, null, new 
HashSet<Long>(Arrays.asList(21L, 22L)));
+
+    CompactionRequest rqst = new CompactionRequest(dbName, tableName, 
CompactionType.MAJOR);
+
+    compactInTxn(rqst);
+    compactInTxn(rqst);
+
+    startCleaner();
+    startCleaner();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(2, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(1).getState());
+
+    List<Path> paths = getDirectories(conf, t, null);
+    Assert.assertEquals(1, paths.size());
+    Assert.assertEquals("base_20", paths.get(0).getName());
+  }
+
+  @Test
+  public void nothingToCleanAfterAbortsDelta() throws Exception {
+    String dbName = "default";
+    String tableName = "camtc";
+    Table t = newTable(dbName, tableName, false);
+
+    addDeltaFile(t, null, 20L, 20L, 1);
+    addDeltaFile(t, null, 21L, 21L, 2);
+    addDeltaFile(t, null, 22L, 22L, 2);
+    burnThroughTransactions(dbName, tableName, 22, null, new 
HashSet<Long>(Arrays.asList(21L, 22L)));
+
+    CompactionRequest rqst = new CompactionRequest(dbName, tableName, 
CompactionType.MAJOR);
+
+    compactInTxn(rqst);
+    compactInTxn(rqst);
+
+    startCleaner();
+    startCleaner();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(2, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(1).getState());
+
+    List<Path> paths = getDirectories(conf, t, null);
+    Assert.assertEquals(1, paths.size());
+    Assert.assertEquals("delta_0000020_0000020", paths.get(0).getName());
+  }
+
+  @Test
+  public void testReady() throws Exception {
+    String dbName = "default";
+    String tblName = "trfcp";
+    String partName = "ds=today";
+    Table t = newTable(dbName, tblName, true);
+    Partition p = newPartition(t, "today");
+
+    // minor compaction
+    addBaseFile(t, p, 19L, 19);
+    addDeltaFile(t, p, 20L, 20L, 1);
+    addDeltaFile(t, p, 21L, 21L, 1);
+    addDeltaFile(t, p, 22L, 22L, 1);
+    burnThroughTransactions(dbName, tblName, 22);
+
+    // block cleaner with an open txn
+    long blockingTxn = openTxn();
+
+    CompactionRequest rqst = new CompactionRequest(dbName, tblName, 
CompactionType.MINOR);
+    rqst.setPartitionname(partName);
+    long ctxnid = compactInTxn(rqst);
+    addDeltaFile(t, p, 20, 22, 2, ctxnid);
+    startCleaner();
+
+    // make sure cleaner didn't remove anything, and cleaning is still queued
+    List<Path> paths = getDirectories(conf, t, p);
+    Assert.assertEquals("Expected 5 files after minor compaction, instead 
these files were present " + paths, 5,
+        paths.size());
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals("Expected 1 compaction in queue, got: " + 
rsp.getCompacts(), 1, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
rsp.getCompacts().get(0).getState());
+  }
+
+
 }

Reply via email to