This is an automated email from the ASF dual-hosted git repository. krisztiankasa pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 919e734d3b6 HIVE-26718: Enable initiator to schedule rebalancing compactions (Laszlo Vegh, reviewed by Krisztian Kasa) 919e734d3b6 is described below commit 919e734d3b67902a721afab1ca9a803855dbf2ec Author: veghlaci05 <lv...@cloudera.com> AuthorDate: Tue Jan 10 08:30:24 2023 +0100 HIVE-26718: Enable initiator to schedule rebalancing compactions (Laszlo Vegh, reviewed by Krisztian Kasa) --- .../hadoop/hive/ql/txn/compactor/Initiator.java | 52 +++++++++++++++- .../hadoop/hive/ql/txn/compactor/Worker.java | 3 +- .../hive/ql/txn/compactor/TestInitiator.java | 70 ++++++++++++++++++++++ .../hadoop/hive/metastore/conf/MetastoreConf.java | 9 +++ 4 files changed, 131 insertions(+), 3 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 612cd492ca1..96a778dfcb8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -479,8 +479,8 @@ public class Initiator extends MetaStoreCompactorThread { return AcidUtils.getAcidState(fs, location, conf, writeIds, Ref.from(false), false); } - private CompactionType determineCompactionType(CompactionInfo ci, AcidDirectory dir, Map<String, - String> tblproperties, long baseSize, long deltaSize) { + private CompactionType determineCompactionType(CompactionInfo ci, AcidDirectory dir, + Map<String, String> tblproperties, long baseSize, long deltaSize) { boolean noBase = false; List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories(); if (baseSize == 0 && deltaSize > 0) { @@ -520,6 +520,10 @@ public class Initiator extends MetaStoreCompactorThread { if (initiateMajor) return CompactionType.MAJOR; } + if (scheduleRebalance(ci, dir, tblproperties, baseSize, deltaSize)) { + return CompactionType.REBALANCE; + } + String deltaNumProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD); int deltaNumThreshold = deltaNumProp == null ? @@ -539,6 +543,50 @@ public class Initiator extends MetaStoreCompactorThread { CompactionType.MAJOR : CompactionType.MINOR; } + private boolean scheduleRebalance(CompactionInfo ci, AcidDirectory dir, Map<String, String> tblproperties, long baseSize, long deltaSize) { + // bucket size calculation can be resource intensive if there are numerous deltas, so we check for rebalance + // compaction only if the table is in an acceptable shape: no major compaction required. This means the number of + // files shouldn't be too high + if ("tez".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) && + HiveConf.getBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED) && + AcidUtils.isFullAcidTable(tblproperties)) { + long totalSize = baseSize + deltaSize; + long minimumSize = MetastoreConf.getLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE); + if (totalSize > minimumSize) { + try { + Map<Integer, Long> bucketSizes = new HashMap<>(); + //compute the size of each bucket + dir.getFiles().stream() + .filter(f -> AcidUtils.bucketFileFilter.accept(f.getHdfsFileStatusWithId().getFileStatus().getPath())) + .forEach( + f -> bucketSizes.merge( + AcidUtils.parseBucketId(f.getHdfsFileStatusWithId().getFileStatus().getPath()), + f.getHdfsFileStatusWithId().getFileStatus().getLen(), + Long::sum)); + final double mean = (double) totalSize / bucketSizes.size(); + + // calculate the standard deviation + double standardDeviation = Math.sqrt( + bucketSizes.values().stream().mapToDouble(Long::doubleValue) + .reduce(0.0, (sum, num) -> Double.sum(sum, Math.pow(num - mean, 2)) / bucketSizes.size())); + + double rsdThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD); + //Relative standard deviation: If the standard deviation is larger than rsdThreshold * average_bucket_size, + // a rebalancing compaction is initiated. + if (standardDeviation > mean * rsdThreshold) { + LOG.debug("Initiating REBALANCE compaction on table {}", ci.tableName); + return true; + } + } catch (IOException e) { + LOG.error("Error occured during checking bucket file sizes, rebalance threshold calculation is skipped.", e); + } + } else { + LOG.debug("Table is smaller than the minimum required size for REBALANCE compaction."); + } + } + return false; + } + private long getBaseSize(AcidDirectory dir) throws IOException { long baseSize = 0; if (dir.getBase() != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index e48b7488e67..7fa1cfca97b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -169,7 +169,8 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { boolean isEnoughToCompact; if (ci.isRebalanceCompaction()) { - //TODO: For now, we are allowing rebalance compaction regardless of the table state. Thresholds will be added later. + //However thresholds are used to schedule REBALANCE compaction, manual triggering is always allowed if the + //table and query engine supports it return true; } else if (ci.isMajorCompaction()) { isEnoughToCompact = diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index 6a9a37dfe3f..7f690dbd6fc 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -72,6 +72,76 @@ public class TestInitiator extends CompactorTest { startInitiator(); } + @Test + public void compactRebalance() throws Exception { + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez"); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); + //Set the tresholds to reach the rebalance compaction threshold without reaching the major compaction threshold. + MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE, 100); + MetastoreConf.setDoubleVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_THRESHOLD, 0.02); + + prepareRebalanceData(); + startInitiator(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("initiated", compacts.get(0).getState()); + Assert.assertEquals("rebalance", compacts.get(0).getTablename()); + Assert.assertEquals(CompactionType.REBALANCE, compacts.get(0).getType()); + } + + @Test + public void noCompactRebalanceSmallTable() throws Exception { + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez"); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); + + prepareRebalanceData(); + startInitiator(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals(0, compacts.size()); + } + + @Test + public void noCompactRebalanceDataBalanced() throws Exception { + HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez"); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); + //Set minimum size to let initiator check, but doesn't modify rebalance threshold. No rebalance compaciton should + //be initiated. + MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE, 100); + + prepareRebalanceData(); + startInitiator(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals(0, compacts.size()); + } + + private void prepareRebalanceData() throws Exception { + Table t = newTable("default", "rebalance", false); + + addBaseFile(t, null, 200L, 200, 2, true); + addDeltaFile(t, null, 201L, 220L, 19, 2, false); + + burnThroughTransactions("default", "rebalance", 220); + + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "default"); + comp.setTablename("rebalance"); + comp.setOperationType(DataOperationType.UPDATE); + List<LockComponent> components = new ArrayList<LockComponent>(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + txnHandler.lock(req); + long writeid = allocateWriteId("default", "rebalance", txnid); + Assert.assertEquals(221, writeid); + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + } + @Test public void recoverFailedLocalWorkers() throws Exception { Table t = newTable("default", "rflw1", false); diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 359b24bc7e6..72ed7c9ee08 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -432,6 +432,15 @@ public class MetastoreConf { "Time after Initiator will ignore metastore.compactor.initiator.failed.compacts.threshold " + "and retry with compaction again. This will try to auto heal tables with previous failed compaction " + "without manual intervention. Setting it to 0 or negative value will disable this feature."), + COMPACTOR_INITIATOR_REBALANCE_MINIMUM_SIZE("metastore.compactor.initiator.rebalance.min.size", + "hive.compactor.initiator.rebalance.min.size", 1024*1024*100, + "Minimum table/partition size for which a rebalancing compaction can be initiated."), + COMPACTOR_INITIATOR_REBALANCE_THRESHOLD("metastore.compactor.initiator.rebalance.threshold", + "hive.compactor.initiator.rebalance.threshold", 0.2d, + "Threshold for the rebalancing compaction. If the std_dev/average_bucket_size (where std_dev is the " + + "standard deviation of the bucket sizes) is larger than the threshold, a rebalance compaction is initiated. " + + "In other words (assuming that the value is 0.2): If the standard deviation is larger than 20% of the average " + + "bucket size, a rebalancing compaction is initiated. "), COMPACTOR_RUN_AS_USER("metastore.compactor.run.as.user", "hive.compactor.run.as.user", "", "Specify the user to run compactor Initiator and Worker as. If empty string, defaults to table/partition " + "directory owner."),