This is an automated email from the ASF dual-hosted git repository. kgyrtkirk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 5214369 HIVE-25883: Enhance Compaction Cleaner to skip when there is nothing to do (#2958) (Zoltan Haindrich reviewed by Denys Kuzmenko) 5214369 is described below commit 5214369d05ac02f36b6723c336cbdb953ea3c61c Author: Zoltan Haindrich <k...@rxd.hu> AuthorDate: Sun Jan 23 08:49:30 2022 +0100 HIVE-25883: Enhance Compaction Cleaner to skip when there is nothing to do (#2958) (Zoltan Haindrich reviewed by Denys Kuzmenko) --- .../hadoop/hive/ql/txn/compactor/Cleaner.java | 57 +++++++--- .../hadoop/hive/ql/txn/compactor/TestCleaner.java | 116 +++++++++++++++++++-- 2 files changed, 154 insertions(+), 19 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index db65250..8d724f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.DataOperationType; @@ -52,6 +51,7 @@ import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable; import org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; @@ -60,6 +60,8 @@ import org.apache.hadoop.hive.common.StringableMap; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedBaseLight; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.Ref; @@ -208,7 +210,7 @@ public class Cleaner extends MetaStoreCompactorThread { } Optional<String> location = Optional.ofNullable(ci.properties).map(StringableMap::new) .map(config -> config.get("location")); - + Callable<Boolean> cleanUpTask; Table t = null; Partition p = resolvePartition(ci); @@ -248,12 +250,12 @@ public class Cleaner extends MetaStoreCompactorThread { if (t != null) { StorageDescriptor sd = resolveStorageDescriptor(t, p); - cleanUpTask = () -> removeFiles(location.orElse(sd.getLocation()), minOpenTxnGLB, ci, + cleanUpTask = () -> removeFiles(location.orElse(sd.getLocation()), minOpenTxnGLB, ci, ci.partName != null && p == null); } else { cleanUpTask = () -> removeFiles(location.get(), ci); } - + Ref<Boolean> removedFiles = Ref.from(false); if (runJobAsSelf(ci.runAs)) { removedFiles.value = cleanUpTask.call(); @@ -328,7 +330,7 @@ public class Cleaner extends MetaStoreCompactorThread { if (dropPartition) { LockRequest lockRequest = createLockRequest(ci, 0, LockType.EXCL_WRITE, DataOperationType.DELETE); LockResponse res = null; - + try { res = txnHandler.lock(lockRequest); if (res.getState() == LockState.ACQUIRED) { @@ -349,7 +351,7 @@ public class Cleaner extends MetaStoreCompactorThread { } } } - + ValidTxnList validTxnList = TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB); //save it so that getAcidState() sees it @@ -388,7 +390,7 @@ public class Cleaner extends MetaStoreCompactorThread { // Creating 'reader' list since we are interested in the set of 'obsolete' files ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, validTxnList); LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList); - + return removeFiles(location, validWriteIdList, ci); } /** @@ -419,6 +421,10 @@ public class Cleaner extends MetaStoreCompactorThread { // Including obsolete directories for partitioned tables can result in data loss. obsoleteDirs = dir.getAbortedDirectories(); } + if (obsoleteDirs.isEmpty() && !hasDataBelowWatermark(fs, path, writeIdList.getHighWatermark())) { + LOG.info(idWatermark(ci) + " nothing to remove below watermark " + writeIdList.getHighWatermark() + ", "); + return true; + } StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream() .map(Path::getName).collect(Collectors.joining(","))); boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo); @@ -428,6 +434,33 @@ public class Cleaner extends MetaStoreCompactorThread { return success; } + private boolean hasDataBelowWatermark(FileSystem fs, Path path, long highWatermark) throws IOException { + FileStatus[] children = fs.listStatus(path); + for (FileStatus child : children) { + if (isFileBelowWatermark(child, highWatermark)) { + return true; + } + } + return false; + } + + private boolean isFileBelowWatermark(FileStatus child, long highWatermark) { + Path p = child.getPath(); + String fn = p.getName(); + if (!child.isDirectory()) { + return false; + } + if (fn.startsWith(AcidUtils.BASE_PREFIX)) { + ParsedBaseLight b = ParsedBaseLight.parseBase(p); + return b.getWriteId() < highWatermark; + } + if (fn.startsWith(AcidUtils.DELTA_PREFIX) || fn.startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { + ParsedDeltaLight d = ParsedDeltaLight.parse(p); + return d.getMaxWriteId() < highWatermark; + } + return false; + } + private boolean removeFiles(String location, CompactionInfo ci) throws NoSuchObjectException, IOException, MetaException { Path path = new Path(location); @@ -439,11 +472,11 @@ public class Cleaner extends MetaStoreCompactorThread { return remove(location, ci, Collections.singletonList(path), ifPurge, path.getFileSystem(conf), extraDebugInfo); } - - private boolean remove(String location, CompactionInfo ci, List<Path> filesToDelete, boolean ifPurge, - FileSystem fs, StringBuilder extraDebugInfo) + + private boolean remove(String location, CompactionInfo ci, List<Path> filesToDelete, boolean ifPurge, + FileSystem fs, StringBuilder extraDebugInfo) throws NoSuchObjectException, MetaException, IOException { - + extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']'); LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() + " obsolete directories from " + location + ". " + extraDebugInfo.toString()); @@ -454,7 +487,7 @@ public class Cleaner extends MetaStoreCompactorThread { } Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), ci.dbname); boolean needCmRecycle = ReplChangeManager.isSourceOfReplication(db); - + for (Path dead : filesToDelete) { LOG.debug("Going to delete path " + dead.toString()); if (needCmRecycle) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index 42c5a04..f258005 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; -import org.apache.hadoop.hive.metastore.api.GetPartitionRequest; import org.apache.hadoop.hive.metastore.api.GetTableRequest; import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; import org.apache.hadoop.hive.metastore.api.FindNextCompactRequest; @@ -210,9 +209,13 @@ public class TestCleaner extends CompactorTest { Assert.assertEquals(2, paths.size()); boolean sawBase = false, sawDelta = false; for (Path p : paths) { - if (p.getName().equals("base_20")) sawBase = true; - else if (p.getName().equals(makeDeltaDirName(21, 24))) sawDelta = true; - else Assert.fail("Unexpected file " + p.getName()); + if (p.getName().equals("base_20")) { + sawBase = true; + } else if (p.getName().equals(makeDeltaDirName(21, 24))) { + sawDelta = true; + } else { + Assert.fail("Unexpected file " + p.getName()); + } } Assert.assertTrue(sawBase); Assert.assertTrue(sawDelta); @@ -246,9 +249,13 @@ public class TestCleaner extends CompactorTest { Assert.assertEquals(2, paths.size()); boolean sawBase = false, sawDelta = false; for (Path path : paths) { - if (path.getName().equals("base_20")) sawBase = true; - else if (path.getName().equals(makeDeltaDirNameCompacted(21, 24))) sawDelta = true; - else Assert.fail("Unexpected file " + path.getName()); + if (path.getName().equals("base_20")) { + sawBase = true; + } else if (path.getName().equals(makeDeltaDirNameCompacted(21, 24))) { + sawDelta = true; + } else { + Assert.fail("Unexpected file " + path.getName()); + } } Assert.assertTrue(sawBase); Assert.assertTrue(sawDelta); @@ -722,4 +729,99 @@ public class TestCleaner extends CompactorTest { Assert.assertTrue(sawBase); Assert.assertTrue(sawDelta); } + + @Test + public void withSingleBaseCleanerSucceeds() throws Exception { + Map<String, String> parameters = new HashMap<>(); + + Table t = newTable("default", "dcamc", false, parameters); + + addBaseFile(t, null, 25L, 25); + + burnThroughTransactions("default", "dcamc", 25); + + CompactionRequest rqst = new CompactionRequest("default", "dcamc", CompactionType.MAJOR); + compactInTxn(rqst); + + startCleaner(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + } + + @Test + public void withNewerBaseCleanerSucceeds() throws Exception { + Map<String, String> parameters = new HashMap<>(); + + Table t = newTable("default", "dcamc", false, parameters); + + addBaseFile(t, null, 25L, 25); + + burnThroughTransactions("default", "dcamc", 25); + + CompactionRequest rqst = new CompactionRequest("default", "dcamc", CompactionType.MAJOR); + compactInTxn(rqst); + + burnThroughTransactions("default", "dcamc", 1); + addBaseFile(t, null, 26L, 26); + + startCleaner(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + + List<Path> paths = getDirectories(conf, t, null); + // we should retain both 25 and 26 + Assert.assertEquals(2, paths.size()); + } + + @Test + public void withNotYetVisibleBase() throws Exception { + + String dbName = "default"; + String tableName = "camtc"; + Table t = newTable(dbName, tableName, false); + + addBaseFile(t, null, 20L, 20); + burnThroughTransactions(dbName, tableName, 25); + + CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); + + long compactTxn = compactInTxn(rqst); + addBaseFile(t, null, 25L, 25, compactTxn); + startCleaner(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + } + + @Test + public void cleanMultipleTimesWithSameWatermark() throws Exception { + String dbName = "default"; + String tableName = "camtc"; + Table t = newTable(dbName, tableName, false); + + addBaseFile(t, null, 20L, 20); + addDeltaFile(t, null, 21L, 22L, 2); + burnThroughTransactions(dbName, tableName, 22); + + CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); + addBaseFile(t, null, 22L, 22); + compactInTxn(rqst); + compactInTxn(rqst); + + startCleaner(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(2, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(1).getState()); + + List<Path> paths = getDirectories(conf, t, null); + Assert.assertEquals(1, paths.size()); + Assert.assertEquals("base_22", paths.get(0).getName()); + } }