This is an automated email from the ASF dual-hosted git repository. zabetak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
commit df45194268ffb10f9f7aae0ab4e3aec35a7b31d8 Author: Taraka Rama Rao Lethavadla <tarakaramarao.lethava...@gmail.com> AuthorDate: Tue Nov 7 19:06:27 2023 +0530 HIVE-27848: Refactor Initiator hierarchy into CompactorUtil and fix failure in TestCrudCompactorOnTez (Taraka Rama Rao Lethavadla reviewed by Stamatis Zampetakis) The work started initially to fix the TestCrudCompactorOnTez.secondCompactionShouldBeRefusedBeforeEnqueueing. However, while changing the code to address the failure, the inheritance based design for the Initator that was chosen in HIVE-27598 revealed some weaknesses briefly outlined below. Due to inheritance the InitiatorBase class becomes a Thread something that doesn't really make sense and it comes with additional overhead every time we instantiate it. Moreover, the only class that currently extends InitiatorBase is the Initiator and it's difficult to imagine how we can make other extensions from InitiatorBase; the code becomes complex and any subtle change in InitiatorBase may have unpredictable effects on Initiator. Having a "Base" class that is not really meant to [...] For the reasons above the focus of the work changed from just re-enabling the test to improving and addressing the shortcomings of the inheritance based design of Initiator. Close apache/hive#4859 --- .../ql/txn/compactor/TestCrudCompactorOnTez.java | 48 +++- .../compact/AlterTableCompactOperation.java | 83 +++--- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 7 + .../hive/ql/txn/compactor/CompactorUtil.java | 250 +++++++++++++++++ .../hadoop/hive/ql/txn/compactor/Initiator.java | 6 +- .../hive/ql/txn/compactor/InitiatorBase.java | 307 --------------------- 6 files changed, 357 insertions(+), 344 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java index f64fac9038d..26302e6c143 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -77,7 +77,6 @@ import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; import org.apache.orc.impl.RecordReaderImpl; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -602,7 +601,6 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { } @Test - @Ignore("HIVE-27848") public void secondCompactionShouldBeRefusedBeforeEnqueueing() throws Exception { conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); @@ -648,6 +646,52 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); } + @Test + public void secondCompactionShouldBeRefusedBeforeEnqueueingForPartition() throws Exception { + conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); + + final String dbName = "default"; + final String tableName = "compaction_test"; + executeStatementOnDriver("drop table if exists " + tableName, driver); + executeStatementOnDriver("CREATE TABLE " + tableName + "(id string, value string) partitioned by(pt string) CLUSTERED BY(id) " + + "INTO 10 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true')", driver); + executeStatementOnDriver("alter table " + tableName + " add partition(pt='test')",driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " partition(pt='test') values ('1','one'),('2','two'),('3','three')," + + "('4','four'),('5','five'),('6','six'),('7','seven'),('8','eight'),('9','nine'),('10','ten')," + + "('11','eleven'),('12','twelve'),('13','thirteen'),('14','fourteen'),('15','fifteen'),('16','sixteen')," + + "('17','seventeen'),('18','eighteen'),('19','nineteen'),('20','twenty')", driver); + + executeStatementOnDriver("insert into " + tableName + " partition(pt='test') values ('21', 'value21'),('84', 'value84')," + + "('66', 'value66'),('54', 'value54')", driver); + executeStatementOnDriver( + "insert into " + tableName + " partition(pt='test') values ('22', 'value22'),('34', 'value34')," + "('35', 'value35')", driver); + executeStatementOnDriver("insert into " + tableName + " partition(pt='test') values ('75', 'value75'),('99', 'value99')", driver); + + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + + //Do a compaction directly and wait for it to finish + CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); + rqst.setPartitionname("pt=test"); + CompactionResponse resp = txnHandler.compact(rqst); + runWorker(conf); + + //Try to do a second compaction on the same table before the cleaner runs. + try { + driver.run("ALTER TABLE " + tableName + " partition(pt='test') COMPACT 'major'"); + } catch (CommandProcessorException e) { + String errorMessage = ErrorMsg.COMPACTION_REFUSED.format(dbName, tableName, " partition(pt=test)", + "Compaction is already scheduled with state='ready for cleaning' and id=" + resp.getId()); + Assert.assertEquals(errorMessage, e.getCauseMessage()); + Assert.assertEquals(ErrorMsg.COMPACTION_REFUSED.getErrorCode(), e.getErrorCode()); + } + + //Check if the first compaction is in 'ready for cleaning' + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + } + @Test public void testMinorCompactionShouldBeRefusedOnTablesWithOriginalFiles() throws Exception { conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java index 8e86056053c..19f4b48d8c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.ddl.table.storage.compact; +import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -27,6 +28,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.JavaUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.DDLOperation; @@ -35,10 +37,13 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.txn.compactor.InitiatorBase; +import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil; -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.LinkedHashMap; +import java.util.Optional; import static org.apache.hadoop.hive.ql.io.AcidUtils.compactionTypeStr2ThriftType; @@ -46,6 +51,7 @@ import static org.apache.hadoop.hive.ql.io.AcidUtils.compactionTypeStr2ThriftTyp * Operation process of compacting a table. */ public class AlterTableCompactOperation extends DDLOperation<AlterTableCompactDesc> { + public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompactDesc desc) { super(context, desc); } @@ -56,6 +62,11 @@ public class AlterTableCompactOperation extends DDLOperation<AlterTableCompactDe throw new HiveException(ErrorMsg.NONACID_COMPACTION_NOT_SUPPORTED, table.getDbName(), table.getTableName()); } + Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap = + convertPartitionsFromThriftToDB(getPartitions(table, desc, context)); + + TxnStore txnHandler = TxnUtils.getTxnStore(context.getConf()); + CompactionRequest compactionRequest = new CompactionRequest(table.getDbName(), table.getTableName(), compactionTypeStr2ThriftType(desc.getCompactionType())); @@ -69,40 +80,48 @@ public class AlterTableCompactOperation extends DDLOperation<AlterTableCompactDe compactionRequest.setNumberOfBuckets(desc.getNumberOfBuckets()); } - InitiatorBase initiatorBase = new InitiatorBase(); - initiatorBase.setConf(context.getConf()); - initiatorBase.init(new AtomicBoolean()); - - Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap = - convertPartitionsFromThriftToDB(getPartitions(table, desc, context)); - - if(desc.getPartitionSpec() != null){ - Optional<String> partitionName = partitionMap.keySet().stream().findFirst(); - partitionName.ifPresent(compactionRequest::setPartitionname); - } - List<CompactionResponse> compactionResponses = - initiatorBase.initiateCompactionForTable(compactionRequest, table.getTTable(), partitionMap); - for (CompactionResponse compactionResponse : compactionResponses) { - if (!compactionResponse.isAccepted()) { - String message; - if (compactionResponse.isSetErrormessage()) { - message = compactionResponse.getErrormessage(); - throw new HiveException(ErrorMsg.COMPACTION_REFUSED, table.getDbName(), table.getTableName(), - "CompactionId: " + compactionResponse.getId(), message); - } - context.getConsole().printInfo( - "Compaction already enqueued with id " + compactionResponse.getId() + "; State is " - + compactionResponse.getState()); - continue; + //Will directly initiate compaction if an un-partitioned table/a partition is specified in the request + if (desc.getPartitionSpec() != null || !table.isPartitioned()) { + if (desc.getPartitionSpec() != null) { + Optional<String> partitionName = partitionMap.keySet().stream().findFirst(); + partitionName.ifPresent(compactionRequest::setPartitionname); } - context.getConsole().printInfo("Compaction enqueued with id " + compactionResponse.getId()); - if (desc.isBlocking() && compactionResponse.isAccepted()) { - waitForCompactionToFinish(compactionResponse, context); + CompactionResponse compactionResponse = txnHandler.compact(compactionRequest); + parseCompactionResponse(compactionResponse, table, compactionRequest.getPartitionname()); + } else { // Check for eligible partitions and initiate compaction + for (Map.Entry<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMapEntry : partitionMap.entrySet()) { + compactionRequest.setPartitionname(partitionMapEntry.getKey()); + CompactionResponse compactionResponse = + CompactorUtil.initiateCompactionForPartition(table.getTTable(), partitionMapEntry.getValue(), + compactionRequest, ServerUtils.hostname(), txnHandler, context.getConf()); + parseCompactionResponse(compactionResponse, table, partitionMapEntry.getKey()); } } return 0; } + private void parseCompactionResponse(CompactionResponse compactionResponse, Table table, String partitionName) + throws HiveException { + if (compactionResponse == null) { + context.getConsole().printInfo( + "Not enough deltas to initiate compaction for table=" + table.getTableName() + "partition=" + partitionName); + return; + } + if (!compactionResponse.isAccepted()) { + if (compactionResponse.isSetErrormessage()) { + throw new HiveException(ErrorMsg.COMPACTION_REFUSED, table.getDbName(), table.getTableName(), + partitionName == null ? "" : " partition(" + partitionName + ")", compactionResponse.getErrormessage()); + } + context.getConsole().printInfo( + "Compaction already enqueued with id " + compactionResponse.getId() + "; State is " + compactionResponse.getState()); + return; + } + context.getConsole().printInfo("Compaction enqueued with id " + compactionResponse.getId()); + if (desc.isBlocking() && compactionResponse.isAccepted()) { + waitForCompactionToFinish(compactionResponse, context); + } + } + private List<Partition> getPartitions(Table table, AlterTableCompactDesc desc, DDLOperationContext context) throws HiveException { List<Partition> partitions = new ArrayList<>(); @@ -117,7 +136,7 @@ public class AlterTableCompactOperation extends DDLOperation<AlterTableCompactDe partitions = context.getDb().getPartitions(table, partitionSpec); if (partitions.size() > 1) { throw new HiveException(ErrorMsg.TOO_MANY_COMPACTION_PARTITIONS); - } else if (partitions.size() == 0) { + } else if (partitions.isEmpty()) { throw new HiveException(ErrorMsg.INVALID_PARTITION_SPEC); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 4c63a74d853..7b7aac818e9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1417,6 +1417,13 @@ public class AcidUtils { return directory; } + public static AcidDirectory getAcidState(StorageDescriptor sd, ValidWriteIdList writeIds, HiveConf conf) + throws IOException { + Path location = new Path(sd.getLocation()); + FileSystem fs = location.getFileSystem(conf); + return getAcidState(fs, location, conf, writeIds, Ref.from(false), false); + } + private static void findBestWorkingDeltas(ValidWriteIdList writeIdList, AcidDirectory directory) { Collections.sort(directory.getCurrentDirectories()); //so now, 'current directories' should be sorted like delta_5_20 delta_5_10 delta_11_20 delta_51_60 for example diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java index c8d86c7ea27..70c550375a2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java @@ -17,38 +17,64 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.LockComponentBuilder; import org.apache.hadoop.hive.metastore.LockRequestBuilder; +import org.apache.hadoop.hive.metastore.api.CompactionResponse; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; import org.apache.hadoop.hive.metastore.api.LockRequest; import org.apache.hadoop.hive.metastore.api.LockType; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; + +import org.apache.hadoop.hive.metastore.metrics.AcidMetricService; +import org.apache.hadoop.hive.metastore.metrics.Metrics; +import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.metastore.utils.StringableMap; import org.apache.hadoop.hive.ql.io.AcidDirectory; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.common.util.Ref; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.HashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -59,6 +85,8 @@ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCa public class CompactorUtil { private static final Logger LOG = LoggerFactory.getLogger(CompactorUtil.class); public static final String COMPACTOR = "compactor"; + private static final String COMPACTOR_THRESHOLD_PREFIX = "compactorthreshold."; + /** * List of accepted properties for defining the compactor's job queue. * @@ -295,4 +323,226 @@ public class CompactorUtil { !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK)); return requestBuilder.build(); } + + private static CompactionResponse requestCompaction(CompactionInfo ci, String runAs, String hostname, + TxnStore txnHandler) throws MetaException { + CompactionRequest compactionRequest = new CompactionRequest(ci.dbname, ci.tableName, ci.type); + if (ci.partName != null) + compactionRequest.setPartitionname(ci.partName); + compactionRequest.setRunas(runAs); + if (StringUtils.isEmpty(ci.initiatorId)) { + compactionRequest.setInitiatorId(hostname + "-" + Thread.currentThread().getId()); + } else { + compactionRequest.setInitiatorId(ci.initiatorId); + } + compactionRequest.setInitiatorVersion(ci.initiatorVersion); + compactionRequest.setPoolName(ci.poolName); + LOG.info("Requesting compaction: " + compactionRequest); + CompactionResponse resp = txnHandler.compact(compactionRequest); + if (resp.isAccepted()) { + ci.id = resp.getId(); + } + return resp; + } + + private static CompactionType determineCompactionType(CompactionInfo ci, AcidDirectory dir, + Map<String, String> tblProperties, long baseSize, long deltaSize, HiveConf conf) { + boolean noBase = false; + List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories(); + if (baseSize == 0 && deltaSize > 0) { + noBase = true; + } else { + String deltaPctProp = + tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD); + float deltaPctThreshold = deltaPctProp == null ? HiveConf.getFloatVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) : Float.parseFloat(deltaPctProp); + boolean bigEnough = (float) deltaSize / (float) baseSize > deltaPctThreshold; + boolean multiBase = dir.getObsolete().stream().anyMatch(path -> path.getName().startsWith(AcidUtils.BASE_PREFIX)); + + boolean initiateMajor = bigEnough || (deltaSize == 0 && multiBase); + if (LOG.isDebugEnabled()) { + StringBuilder msg = new StringBuilder("delta size: "); + msg.append(deltaSize); + msg.append(" base size: "); + msg.append(baseSize); + msg.append(" multiBase "); + msg.append(multiBase); + msg.append(" deltaSize "); + msg.append(deltaSize); + msg.append(" threshold: "); + msg.append(deltaPctThreshold); + msg.append(" delta/base ratio > ").append(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD.varname) + .append(": "); + msg.append(bigEnough); + msg.append("."); + if (!initiateMajor) { + msg.append("not"); + } + msg.append(" initiating major compaction."); + LOG.debug(msg.toString()); + } + if (initiateMajor) + return CompactionType.MAJOR; + } + + String deltaNumProp = + tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD); + int deltaNumThreshold = deltaNumProp == null ? HiveConf.getIntVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) : Integer.parseInt(deltaNumProp); + boolean enough = deltas.size() > deltaNumThreshold; + if (!enough) { + LOG.debug( + "Not enough deltas to initiate compaction for table=" + ci.tableName + "partition=" + ci.partName + + ". Found: " + deltas.size() + " deltas, threshold is " + deltaNumThreshold); + return null; + } + // If there's no base file, do a major compaction + LOG.debug("Found " + deltas.size() + " delta files, and " + (noBase ? "no" : "has") + " base," + "requesting " + + (noBase ? "major" : "minor") + " compaction"); + + return noBase || !isMinorCompactionSupported(conf, tblProperties, + dir) ? CompactionType.MAJOR : CompactionType.MINOR; + } + + private static long getBaseSize(AcidDirectory dir) throws IOException { + long baseSize = 0; + if (dir.getBase() != null) { + baseSize = getDirSize(dir.getFs(), dir.getBase()); + } else { + for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles()) { + baseSize += origStat.getFileStatus().getLen(); + } + } + return baseSize; + } + + private static long getDirSize(FileSystem fs, AcidUtils.ParsedDirectory dir) throws IOException { + return dir.getFiles(fs, Ref.from(false)).stream().map(HadoopShims.HdfsFileStatusWithId::getFileStatus) + .mapToLong(FileStatus::getLen).sum(); + } + + private static CompactionType checkForCompaction(final CompactionInfo ci, final ValidWriteIdList writeIds, + final StorageDescriptor sd, final Map<String, String> tblProperties, final String runAs, TxnStore txnHandler, + HiveConf conf) throws IOException, InterruptedException { + // If it's marked as too many aborted, we already know we need to compact + if (ci.tooManyAborts) { + LOG.debug("Found too many aborted transactions for " + + ci.getFullPartitionName() + ", " + "initiating major compaction"); + return CompactionType.MAJOR; + } + + if (ci.hasOldAbort) { + HiveConf.ConfVars oldAbortedTimeoutProp = HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD; + LOG.debug( + "Found an aborted transaction for " + ci.getFullPartitionName() + " with age older than threshold " + + oldAbortedTimeoutProp + ": " + conf.getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. " + + "Initiating minor compaction."); + return CompactionType.MINOR; + } + AcidDirectory acidDirectory = AcidUtils.getAcidState(sd, writeIds, conf); + long baseSize = getBaseSize(acidDirectory); + FileSystem fs = acidDirectory.getFs(); + Map<Path, Long> deltaSizes = new HashMap<>(); + for (AcidUtils.ParsedDelta delta : acidDirectory.getCurrentDirectories()) { + deltaSizes.put(delta.getPath(), getDirSize(fs, delta)); + } + long deltaSize = deltaSizes.values().stream().reduce(0L, Long::sum); + AcidMetricService.updateMetricsFromInitiator(ci.dbname, ci.tableName, ci.partName, conf, txnHandler, baseSize, + deltaSizes, acidDirectory.getObsolete()); + + if (runJobAsSelf(runAs)) { + return determineCompactionType(ci, acidDirectory, tblProperties, baseSize, deltaSize, conf); + } else { + LOG.info("Going to initiate as user " + runAs + " for " + ci.getFullPartitionName()); + UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, UserGroupInformation.getLoginUser()); + CompactionType compactionType; + try { + compactionType = ugi.doAs( + (PrivilegedExceptionAction<CompactionType>) () -> determineCompactionType(ci, acidDirectory, tblProperties, + baseSize, deltaSize, conf)); + } finally { + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + ci.getFullPartitionName(), + exception); + } + } + return compactionType; + } + } + + private static ValidWriteIdList resolveValidWriteIds(Table t, TxnStore txnHandler, HiveConf conf) + throws NoSuchTxnException, MetaException { + ValidTxnList validTxnList = new ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY)); + // The response will have one entry per table and hence we get only one ValidWriteIdList + String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); + GetValidWriteIdsRequest validWriteIdsRequest = + new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); + validWriteIdsRequest.setValidTxnList(validTxnList.writeToString()); + + return TxnUtils.createValidCompactWriteIdList( + txnHandler.getValidWriteIds(validWriteIdsRequest).getTblValidWriteIds().get(0)); + } + + public static CompactionResponse scheduleCompactionIfRequired(CompactionInfo ci, Table t, Partition p, String runAs, + boolean metricsEnabled, String hostName, TxnStore txnHandler, HiveConf conf) throws MetaException { + StorageDescriptor sd = resolveStorageDescriptor(t, p); + try { + ValidWriteIdList validWriteIds = resolveValidWriteIds(t, txnHandler, conf); + + checkInterrupt(Initiator.class.getName()); + + CompactionType type = checkForCompaction(ci, validWriteIds, sd, t.getParameters(), runAs, txnHandler, conf); + if (type != null) { + ci.type = type; + return requestCompaction(ci, runAs, hostName, txnHandler); + } + } catch (InterruptedException e) { + //Handle InterruptedException separately so the compactionInfo won't be marked as failed. + LOG.info("Initiator pool is being shut down, task received interruption."); + } catch (Throwable ex) { + String errorMessage = "Caught exception while trying to determine if we should compact " + ci + + ". Marking " + "failed to avoid repeated failures, " + ex; + LOG.error(errorMessage); + ci.errorMessage = errorMessage; + if (metricsEnabled) { + Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER).inc(); + } + txnHandler.markFailed(ci); + } + return null; + } + + public static CompactionResponse initiateCompactionForPartition(Table table, Partition partition, + CompactionRequest compactionRequest, String hostName, TxnStore txnHandler, HiveConf inputConf) throws MetaException { + ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0); + HiveConf conf = new HiveConf(inputConf); + conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + CompactionResponse compactionResponse; + CompactionInfo compactionInfo = + new CompactionInfo(table.getDbName(), table.getTableName(), compactionRequest.getPartitionname(), + compactionRequest.getType()); + compactionInfo.initiatorId = compactionRequest.getInitiatorId(); + compactionInfo.orderByClause = compactionRequest.getOrderByClause(); + compactionInfo.initiatorVersion = compactionRequest.getInitiatorVersion(); + if (compactionRequest.getNumberOfBuckets() > 0) { + compactionInfo.numberOfBuckets = compactionRequest.getNumberOfBuckets(); + } + compactionInfo.poolName = compactionRequest.getPoolName(); + try { + StorageDescriptor sd = resolveStorageDescriptor(table, partition); + String runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf); + LOG.info("Checking to see if we should compact partition {} of table {}.{}", compactionInfo.partName, + table.getDbName(), table.getTableName()); + compactionResponse = + scheduleCompactionIfRequired(compactionInfo, table, partition, runAs, false, hostName, + txnHandler, conf); + } catch (IOException | InterruptedException | MetaException e) { + LOG.error("Error occurred while Checking if we should compact partition {} of table {}.{} Exception: {}", + compactionInfo.partName, table.getDbName(), table.getTableName(), e.getMessage()); + throw new RuntimeException(e); + } + return compactionResponse; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 58cb478bbe8..75c435a3123 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -51,7 +51,7 @@ import static org.apache.hadoop.hive.conf.Constants.COMPACTOR_INTIATOR_THREAD_NA * A class to initiate compactions. This will run in a separate thread. * It's critical that there exactly 1 of these in a given warehouse. */ -public class Initiator extends InitiatorBase { +public class Initiator extends MetaStoreCompactorThread { static final private String CLASS_NAME = Initiator.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); @@ -153,11 +153,11 @@ public class Initiator extends InitiatorBase { * Therefore, using a thread pool here and running checkForCompactions in parallel */ String tableName = ci.getFullTableName(); String partition = ci.getFullPartitionName(); - + ci.initiatorVersion = this.runtimeVersion; CompletableFuture<Void> asyncJob = CompletableFuture.runAsync( CompactorUtil.ThrowingRunnable.unchecked(() -> - scheduleCompactionIfRequired(ci, t, p, runAs, metricsEnabled)), compactionExecutor) + CompactorUtil.scheduleCompactionIfRequired(ci, t, p, runAs, metricsEnabled, hostName, txnHandler, conf)), compactionExecutor) .exceptionally(exc -> { LOG.error("Error while running scheduling the compaction on the table {} / partition {}.", tableName, partition, exc); return null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java deleted file mode 100644 index 8f632dbd398..00000000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java +++ /dev/null @@ -1,307 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.txn.compactor; - -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.ValidReadTxnList; -import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidWriteIdList; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.CompactionRequest; -import org.apache.hadoop.hive.metastore.api.CompactionResponse; -import org.apache.hadoop.hive.metastore.api.CompactionType; -import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.metrics.AcidMetricService; -import org.apache.hadoop.hive.metastore.metrics.Metrics; -import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; -import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; -import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; -import org.apache.hadoop.hive.metastore.txn.TxnUtils; -import org.apache.hadoop.hive.ql.io.AcidDirectory; -import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.shims.HadoopShims; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hive.common.util.Ref; - -import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -public class InitiatorBase extends MetaStoreCompactorThread { - - static final private String COMPACTOR_THRESHOLD_PREFIX = "compactorthreshold."; - - private List<CompactionResponse> initiateCompactionForMultiplePartitions(Table table, - Map<String, Partition> partitions, CompactionRequest request) { - List<CompactionResponse> compactionResponses = new ArrayList<>(); - partitions.entrySet().parallelStream().forEach(entry -> { - try { - StorageDescriptor sd = CompactorUtil.resolveStorageDescriptor(table, entry.getValue()); - String runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf); - CompactionInfo ci = - new CompactionInfo(table.getDbName(), table.getTableName(), entry.getKey(), request.getType()); - ci.initiatorId = request.getInitiatorId(); - ci.orderByClause = request.getOrderByClause(); - ci.initiatorVersion = request.getInitiatorVersion(); - if (request.getNumberOfBuckets() > 0) { - ci.numberOfBuckets = request.getNumberOfBuckets(); - } - ci.poolName = request.getPoolName(); - LOG.info( - "Checking to see if we should compact partition " + entry.getKey() + " of table " + table.getDbName() + "." - + table.getTableName()); - CollectionUtils.addIgnoreNull(compactionResponses, - scheduleCompactionIfRequired(ci, table, entry.getValue(), runAs, false)); - } catch (IOException | InterruptedException | MetaException e) { - LOG.error( - "Error occurred while Checking if we should compact partition " + entry.getKey() + " of table " + table.getDbName() + "." - + table.getTableName() + " Exception: " + e.getMessage()); - throw new RuntimeException(e); - } - }); - return compactionResponses; - } - - public List<CompactionResponse> initiateCompactionForTable(CompactionRequest request, Table table, Map<String, Partition> partitions) throws Exception { - ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0); - conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); - - if (request.getPartitionname()!= null || partitions.isEmpty()) { - List<CompactionResponse> responses = new ArrayList<>(); - responses.add(txnHandler.compact(request)); - return responses; - } else { - return initiateCompactionForMultiplePartitions(table, partitions, request); - } - } - - @Override protected boolean isCacheEnabled() { - return false; - } - - private String getInitiatorId(long threadId) { - return this.hostName + "-" + threadId; - } - - private CompactionResponse requestCompaction(CompactionInfo ci, String runAs) throws MetaException { - CompactionRequest compactionRequest = new CompactionRequest(ci.dbname, ci.tableName, ci.type); - if (ci.partName != null) - compactionRequest.setPartitionname(ci.partName); - compactionRequest.setRunas(runAs); - if (StringUtils.isEmpty(ci.initiatorId)) { - compactionRequest.setInitiatorId(getInitiatorId(Thread.currentThread().getId())); - } else { - compactionRequest.setInitiatorId(ci.initiatorId); - } - compactionRequest.setInitiatorVersion(this.runtimeVersion); - compactionRequest.setPoolName(ci.poolName); - LOG.info("Requesting compaction: " + compactionRequest); - CompactionResponse resp = txnHandler.compact(compactionRequest); - if (resp.isAccepted()) { - ci.id = resp.getId(); - } - return resp; - } - - private AcidDirectory getAcidDirectory(StorageDescriptor sd, ValidWriteIdList writeIds) throws IOException { - Path location = new Path(sd.getLocation()); - FileSystem fs = location.getFileSystem(conf); - return AcidUtils.getAcidState(fs, location, conf, writeIds, Ref.from(false), false); - } - - private CompactionType determineCompactionType(CompactionInfo ci, AcidDirectory dir, - Map<String, String> tblProperties, long baseSize, long deltaSize) { - boolean noBase = false; - List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories(); - if (baseSize == 0 && deltaSize > 0) { - noBase = true; - } else { - String deltaPctProp = - tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD); - float deltaPctThreshold = deltaPctProp == null ? HiveConf.getFloatVar(conf, - HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) : Float.parseFloat(deltaPctProp); - boolean bigEnough = (float) deltaSize / (float) baseSize > deltaPctThreshold; - boolean multiBase = dir.getObsolete().stream().anyMatch(path -> path.getName().startsWith(AcidUtils.BASE_PREFIX)); - - boolean initiateMajor = bigEnough || (deltaSize == 0 && multiBase); - if (LOG.isDebugEnabled()) { - StringBuilder msg = new StringBuilder("delta size: "); - msg.append(deltaSize); - msg.append(" base size: "); - msg.append(baseSize); - msg.append(" multiBase "); - msg.append(multiBase); - msg.append(" deltaSize "); - msg.append(deltaSize); - msg.append(" threshold: "); - msg.append(deltaPctThreshold); - msg.append(" delta/base ratio > ").append(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD.varname) - .append(": "); - msg.append(bigEnough); - msg.append("."); - if (!initiateMajor) { - msg.append("not"); - } - msg.append(" initiating major compaction."); - LOG.debug(msg.toString()); - } - if (initiateMajor) - return CompactionType.MAJOR; - } - - String deltaNumProp = - tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD); - int deltaNumThreshold = deltaNumProp == null ? HiveConf.getIntVar(conf, - HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) : Integer.parseInt(deltaNumProp); - boolean enough = deltas.size() > deltaNumThreshold; - if (!enough) { - LOG.debug("Not enough deltas to initiate compaction for table=" + ci.tableName + "partition=" + ci.partName - + ". Found: " + deltas.size() + " deltas, threshold is " + deltaNumThreshold); - return null; - } - // If there's no base file, do a major compaction - LOG.debug("Found " + deltas.size() + " delta files, and " + (noBase ? "no" : "has") + " base," + "requesting " - + (noBase ? "major" : "minor") + " compaction"); - - return noBase || !CompactorUtil.isMinorCompactionSupported(conf, tblProperties, dir) ? CompactionType.MAJOR : CompactionType.MINOR; - } - - private long getBaseSize(AcidDirectory dir) throws IOException { - long baseSize = 0; - if (dir.getBase() != null) { - baseSize = getDirSize(dir.getFs(), dir.getBase()); - } else { - for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles()) { - baseSize += origStat.getFileStatus().getLen(); - } - } - return baseSize; - } - - private long getDirSize(FileSystem fs, AcidUtils.ParsedDirectory dir) throws IOException { - return dir.getFiles(fs, Ref.from(false)).stream().map(HadoopShims.HdfsFileStatusWithId::getFileStatus) - .mapToLong(FileStatus::getLen).sum(); - } - - private CompactionType checkForCompaction(final CompactionInfo ci, final ValidWriteIdList writeIds, - final StorageDescriptor sd, final Map<String, String> tblProperties, final String runAs) - throws IOException, InterruptedException { - // If it's marked as too many aborted, we already know we need to compact - if (ci.tooManyAborts) { - LOG.debug("Found too many aborted transactions for " + ci.getFullPartitionName() + ", " - + "initiating major compaction"); - return CompactionType.MAJOR; - } - - if (ci.hasOldAbort) { - HiveConf.ConfVars oldAbortedTimeoutProp = HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD; - LOG.debug("Found an aborted transaction for " + ci.getFullPartitionName() + " with age older than threshold " - + oldAbortedTimeoutProp + ": " + conf.getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. " - + "Initiating minor compaction."); - return CompactionType.MINOR; - } - AcidDirectory acidDirectory = getAcidDirectory(sd, writeIds); - long baseSize = getBaseSize(acidDirectory); - FileSystem fs = acidDirectory.getFs(); - Map<Path, Long> deltaSizes = new HashMap<>(); - for (AcidUtils.ParsedDelta delta : acidDirectory.getCurrentDirectories()) { - deltaSizes.put(delta.getPath(), getDirSize(fs, delta)); - } - long deltaSize = deltaSizes.values().stream().reduce(0L, Long::sum); - AcidMetricService.updateMetricsFromInitiator(ci.dbname, ci.tableName, ci.partName, conf, txnHandler, baseSize, - deltaSizes, acidDirectory.getObsolete()); - - if (CompactorUtil.runJobAsSelf(runAs)) { - return determineCompactionType(ci, acidDirectory, tblProperties, baseSize, deltaSize); - } else { - LOG.info("Going to initiate as user " + runAs + " for " + ci.getFullPartitionName()); - UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, UserGroupInformation.getLoginUser()); - CompactionType compactionType; - try { - compactionType = ugi.doAs( - (PrivilegedExceptionAction<CompactionType>) () -> determineCompactionType(ci, acidDirectory, tblProperties, - baseSize, deltaSize)); - } finally { - try { - FileSystem.closeAllForUGI(ugi); - } catch (IOException exception) { - LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + ci.getFullPartitionName(), - exception); - } - } - return compactionType; - } - } - - private ValidWriteIdList resolveValidWriteIds(Table t) - throws NoSuchTxnException, MetaException { - ValidTxnList validTxnList = new ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY)); - // The response will have one entry per table and hence we get only one ValidWriteIdList - String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); - GetValidWriteIdsRequest validWriteIdsRequest = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); - validWriteIdsRequest.setValidTxnList(validTxnList.writeToString()); - - return TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(validWriteIdsRequest).getTblValidWriteIds().get(0)); - } - - protected CompactionResponse scheduleCompactionIfRequired(CompactionInfo ci, Table t, - Partition p, String runAs, boolean metricsEnabled) - throws MetaException { - StorageDescriptor sd = CompactorUtil.resolveStorageDescriptor(t, p); - try { - ValidWriteIdList validWriteIds = resolveValidWriteIds(t); - - CompactorUtil.checkInterrupt(InitiatorBase.class.getName()); - - CompactionType type = checkForCompaction(ci, validWriteIds, sd, t.getParameters(), runAs); - if (type != null) { - ci.type = type; - return requestCompaction(ci, runAs); - } - } catch (InterruptedException e) { - //Handle InterruptedException separately so the compactionInfo won't be marked as failed. - LOG.info("Initiator pool is being shut down, task received interruption."); - } catch (Throwable ex) { - String errorMessage = "Caught exception while trying to determine if we should compact " + ci + ". Marking " - + "failed to avoid repeated failures, " + ex; - LOG.error(errorMessage); - ci.errorMessage = errorMessage; - if (metricsEnabled) { - Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER).inc(); - } - txnHandler.markFailed(ci); - } - return null; - } - -}