This is an automated email from the ASF dual-hosted git repository. kgyrtkirk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new d3542e1 HIVE-25977: Enhance Compaction Cleaner to skip when there is nothing to do #2 (#2971) (Zoltan Haindrich reviewed by Karen Coppage and Denys Kuzmenko) d3542e1 is described below commit d3542e1b35bbdbaafb52ad742a5168bd29549cee Author: Zoltan Haindrich <k...@rxd.hu> AuthorDate: Wed Mar 9 13:04:11 2022 +0100 HIVE-25977: Enhance Compaction Cleaner to skip when there is nothing to do #2 (#2971) (Zoltan Haindrich reviewed by Karen Coppage and Denys Kuzmenko) --- .../txn/compactor/TestCleanerWithReplication.java | 4 +- .../hadoop/hive/ql/txn/compactor/Cleaner.java | 38 ++++++--- .../hive/ql/txn/compactor/CompactorTest.java | 33 ++++++-- .../hadoop/hive/ql/txn/compactor/TestCleaner.java | 95 ++++++++++++++++++++++ 4 files changed, 148 insertions(+), 22 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java index 429d55c..6353b37 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java @@ -143,7 +143,7 @@ public class TestCleanerWithReplication extends CompactorTest { addDeltaFile(t, null, 23L, 24L, 2); addDeltaFile(t, null, 21L, 24L, 4); - burnThroughTransactions(dbName, "camitc", 25); + burnThroughTransactions(dbName, "camitc", 24); CompactionRequest rqst = new CompactionRequest(dbName, "camitc", CompactionType.MINOR); compactInTxn(rqst); @@ -161,7 +161,7 @@ public class TestCleanerWithReplication extends CompactorTest { addDeltaFile(t, p, 23L, 24L, 2); addDeltaFile(t, p, 21L, 24L, 4); - burnThroughTransactions(dbName, "camipc", 25); + burnThroughTransactions(dbName, "camipc", 24); CompactionRequest rqst = new CompactionRequest(dbName, "camipc", CompactionType.MINOR); rqst.setPartitionname("ds=today"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 1e0dbf8..55a7802 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.DataOperationType; @@ -62,6 +61,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedBaseLight; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; @@ -71,14 +71,15 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.apache.hadoop.hive.conf.Constants.COMPACTOR_CLEANER_THREAD_NAME_FORMAT; @@ -87,8 +88,6 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAY import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf; import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; -import com.codahale.metrics.Counter; - /** * A class to clean directories after compactions. This will run in a separate thread. */ @@ -425,8 +424,10 @@ public class Cleaner extends MetaStoreCompactorThread { // Including obsolete directories for partitioned tables can result in data loss. obsoleteDirs = dir.getAbortedDirectories(); } - if (obsoleteDirs.isEmpty() && !hasDataBelowWatermark(fs, path, writeIdList.getHighWatermark())) { - LOG.info(idWatermark(ci) + " nothing to remove below watermark " + writeIdList.getHighWatermark() + ", "); + + if (obsoleteDirs.isEmpty() + && !hasDataBelowWatermark(dir, fs, path, ci.highestWriteId, writeIdList.getHighWatermark())) { + LOG.info(idWatermark(ci) + " nothing to remove below watermark " + ci.highestWriteId + ", "); return true; } StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream() @@ -439,29 +440,40 @@ public class Cleaner extends MetaStoreCompactorThread { return success; } - private boolean hasDataBelowWatermark(FileSystem fs, Path path, long highWatermark) throws IOException { - FileStatus[] children = fs.listStatus(path); + private boolean hasDataBelowWatermark(AcidDirectory acidDir, FileSystem fs, Path path, long highWatermark, + long minOpenTxn) + throws IOException { + Set<Path> acidPaths = new HashSet<>(); + for (ParsedDelta delta : acidDir.getCurrentDirectories()) { + acidPaths.add(delta.getPath()); + } + if (acidDir.getBaseDirectory() != null) { + acidPaths.add(acidDir.getBaseDirectory()); + } + FileStatus[] children = fs.listStatus(path, p -> { + return !acidPaths.contains(p); + }); for (FileStatus child : children) { - if (isFileBelowWatermark(child, highWatermark)) { + if (isFileBelowWatermark(child, highWatermark, minOpenTxn)) { return true; } } return false; } - private boolean isFileBelowWatermark(FileStatus child, long highWatermark) { + private boolean isFileBelowWatermark(FileStatus child, long highWatermark, long minOpenTxn) { Path p = child.getPath(); String fn = p.getName(); if (!child.isDirectory()) { - return false; + return true; } if (fn.startsWith(AcidUtils.BASE_PREFIX)) { ParsedBaseLight b = ParsedBaseLight.parseBase(p); - return b.getWriteId() < highWatermark; + return b.getWriteId() <= highWatermark; } if (fn.startsWith(AcidUtils.DELTA_PREFIX) || fn.startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { ParsedDeltaLight d = ParsedDeltaLight.parse(p); - return d.getMaxWriteId() < highWatermark; + return d.getMaxWriteId() <= highWatermark; } return false; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index f4ce773..e486497 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -197,7 +197,9 @@ public abstract class CompactorTest { TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY); } table.setParameters(parameters); - if (isTemporary) table.setTemporary(true); + if (isTemporary) { + table.setTemporary(true); + } // drop the table first, in case some previous test created it ms.dropTable(dbName, tableName); @@ -272,6 +274,11 @@ public abstract class CompactorTest { } protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords, + long visibilityId) throws Exception { + addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true, visibilityId); + } + + protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords, int numBuckets, boolean allBucketsPresent) throws Exception { addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, numBuckets, allBucketsPresent); } @@ -288,7 +295,9 @@ public abstract class CompactorTest { FileSystem fs = FileSystem.get(conf); FileStatus[] stats = fs.listStatus(dir); List<Path> paths = new ArrayList<Path>(stats.length); - for (int i = 0; i < stats.length; i++) paths.add(stats[i].getPath()); + for (int i = 0; i < stats.length; i++) { + paths.add(stats[i].getPath()); + } return paths; } @@ -398,7 +407,9 @@ public abstract class CompactorTest { FileSystem fs = FileSystem.get(conf); for (int bucket = 0; bucket < numBuckets; bucket++) { - if (bucket == 0 && !allBucketsPresent) continue; // skip one + if (bucket == 0 && !allBucketsPresent) { + continue; // skip one + } Path partFile = null; if (type == FileType.LEGACY) { partFile = new Path(location, String.format(AcidUtils.LEGACY_FILE_BUCKET_DIGITS, bucket) + "_0"); @@ -443,7 +454,9 @@ public abstract class CompactorTest { if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) { Path p = AcidUtils.createBucketFile(baseDirectory, bucket); FileSystem fs = p.getFileSystem(conf); - if (fs.exists(p)) filesToRead.add(p); + if (fs.exists(p)) { + filesToRead.add(p); + } } else { filesToRead.add(new Path(baseDirectory, "000000_0")); @@ -452,7 +465,9 @@ public abstract class CompactorTest { for (int i = 0; i < deltaDirectory.length; i++) { Path p = AcidUtils.createBucketFile(deltaDirectory[i], bucket); FileSystem fs = p.getFileSystem(conf); - if (fs.exists(p)) filesToRead.add(p); + if (fs.exists(p)) { + filesToRead.add(p); + } } return new MockRawReader(conf, filesToRead); } @@ -484,7 +499,9 @@ public abstract class CompactorTest { MockRawReader(Configuration conf, List<Path> files) throws IOException { filesToRead = new Stack<Path>(); - for (Path file : files) filesToRead.push(file); + for (Path file : files) { + filesToRead.push(file); + } this.conf = conf; fs = FileSystem.get(conf); } @@ -512,7 +529,9 @@ public abstract class CompactorTest { public boolean next(RecordIdentifier identifier, Text text) throws IOException { if (is == null) { // Open the next file - if (filesToRead.empty()) return false; + if (filesToRead.empty()) { + return false; + } Path p = filesToRead.pop(); LOG.debug("Reading records from " + p.toString()); is = fs.open(p); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index 1a09d50..2bb8ace 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -43,8 +43,10 @@ import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -852,4 +854,97 @@ public class TestCleaner extends CompactorTest { Assert.assertEquals(1, paths.size()); Assert.assertEquals("base_22", paths.get(0).getName()); } + + @Test + public void nothingToCleanAfterAbortsBase() throws Exception { + String dbName = "default"; + String tableName = "camtc"; + Table t = newTable(dbName, tableName, false); + + addBaseFile(t, null, 20L, 1); + addDeltaFile(t, null, 21L, 21L, 2); + addDeltaFile(t, null, 22L, 22L, 2); + burnThroughTransactions(dbName, tableName, 22, null, new HashSet<Long>(Arrays.asList(21L, 22L))); + + CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); + + compactInTxn(rqst); + compactInTxn(rqst); + + startCleaner(); + startCleaner(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(2, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(1).getState()); + + List<Path> paths = getDirectories(conf, t, null); + Assert.assertEquals(1, paths.size()); + Assert.assertEquals("base_20", paths.get(0).getName()); + } + + @Test + public void nothingToCleanAfterAbortsDelta() throws Exception { + String dbName = "default"; + String tableName = "camtc"; + Table t = newTable(dbName, tableName, false); + + addDeltaFile(t, null, 20L, 20L, 1); + addDeltaFile(t, null, 21L, 21L, 2); + addDeltaFile(t, null, 22L, 22L, 2); + burnThroughTransactions(dbName, tableName, 22, null, new HashSet<Long>(Arrays.asList(21L, 22L))); + + CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); + + compactInTxn(rqst); + compactInTxn(rqst); + + startCleaner(); + startCleaner(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(2, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(1).getState()); + + List<Path> paths = getDirectories(conf, t, null); + Assert.assertEquals(1, paths.size()); + Assert.assertEquals("delta_0000020_0000020", paths.get(0).getName()); + } + + @Test + public void testReady() throws Exception { + String dbName = "default"; + String tblName = "trfcp"; + String partName = "ds=today"; + Table t = newTable(dbName, tblName, true); + Partition p = newPartition(t, "today"); + + // minor compaction + addBaseFile(t, p, 19L, 19); + addDeltaFile(t, p, 20L, 20L, 1); + addDeltaFile(t, p, 21L, 21L, 1); + addDeltaFile(t, p, 22L, 22L, 1); + burnThroughTransactions(dbName, tblName, 22); + + // block cleaner with an open txn + long blockingTxn = openTxn(); + + CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MINOR); + rqst.setPartitionname(partName); + long ctxnid = compactInTxn(rqst); + addDeltaFile(t, p, 20, 22, 2, ctxnid); + startCleaner(); + + // make sure cleaner didn't remove anything, and cleaning is still queued + List<Path> paths = getDirectories(conf, t, p); + Assert.assertEquals("Expected 5 files after minor compaction, instead these files were present " + paths, 5, + paths.size()); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Expected 1 compaction in queue, got: " + rsp.getCompacts(), 1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + } + + }