This is an automated email from the ASF dual-hosted git repository. veghlaci05 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new d2d4b86433e HIVE-27598 - Enhance alter table compact to work for partitioned tables (Taraka Rama Rao Lethavadla, reviewed by Laszlo Vegh) d2d4b86433e is described below commit d2d4b86433ef356bf738edd72bfef74511a15444 Author: tarak271 <tarakaramarao.lethava...@gmail.com> AuthorDate: Tue Oct 31 15:08:20 2023 +0530 HIVE-27598 - Enhance alter table compact to work for partitioned tables (Taraka Rama Rao Lethavadla, reviewed by Laszlo Vegh) --- .../compact/AlterTableCompactOperation.java | 130 ++++--- .../hadoop/hive/ql/txn/compactor/Initiator.java | 249 +------------ .../hive/ql/txn/compactor/InitiatorBase.java | 309 ++++++++++++++++ .../queries/clientpositive/manual_compaction.q | 68 ++++ .../clientpositive/llap/manual_compaction.q.out | 387 +++++++++++++++++++++ 5 files changed, 843 insertions(+), 300 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java index 9187101367b..cc896331aff 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java @@ -18,27 +18,27 @@ package org.apache.hadoop.hive.ql.ddl.table.storage.compact; -import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.CompactionRequest; -import org.apache.hadoop.hive.metastore.utils.JavaUtils; -import org.apache.hadoop.hive.ql.ddl.DDLOperationContext; -import org.apache.hadoop.hive.ql.io.AcidUtils; - -import java.util.List; -import java.util.Map; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionResponse; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.DDLOperation; +import org.apache.hadoop.hive.ql.ddl.DDLOperationContext; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.txn.compactor.InitiatorBase; + +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hive.ql.io.AcidUtils.compactionTypeStr2ThriftType; @@ -50,80 +50,95 @@ public class AlterTableCompactOperation extends DDLOperation<AlterTableCompactDe super(context, desc); } - @Override - public int execute() throws HiveException { + @Override public int execute() throws Exception { Table table = context.getDb().getTable(desc.getTableName()); if (!AcidUtils.isTransactionalTable(table)) { throw new HiveException(ErrorMsg.NONACID_COMPACTION_NOT_SUPPORTED, table.getDbName(), table.getTableName()); } - String partitionName = getPartitionName(table); + CompactionRequest compactionRequest = new CompactionRequest(table.getDbName(), table.getTableName(), + compactionTypeStr2ThriftType(desc.getCompactionType())); - CompactionResponse resp = compact(table, partitionName); - if (!resp.isAccepted()) { - String message = Constants.ERROR_MESSAGE_NO_DETAILS_AVAILABLE; - if (resp.isSetErrormessage()) { - message = resp.getErrormessage(); - } - throw new HiveException(ErrorMsg.COMPACTION_REFUSED, - table.getDbName(), table.getTableName(), partitionName == null ? "" : "(partition=" + partitionName + ")", message); - } + compactionRequest.setPoolName(desc.getPoolName()); + compactionRequest.setProperties(desc.getProperties()); + compactionRequest.setInitiatorId(JavaUtils.hostname() + "-" + HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION); + compactionRequest.setInitiatorVersion(HiveMetaStoreClient.class.getPackage().getImplementationVersion()); + compactionRequest.setOrderByClause(desc.getOrderByClause()); - if (desc.isBlocking() && resp.isAccepted()) { - waitForCompactionToFinish(resp); + if (desc.getNumberOfBuckets() > 0) { + compactionRequest.setNumberOfBuckets(desc.getNumberOfBuckets()); } + InitiatorBase initiatorBase = new InitiatorBase(); + initiatorBase.setConf(context.getConf()); + initiatorBase.init(new AtomicBoolean()); + + Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap = + convertPartitionsFromThriftToDB(getPartitions(table, desc, context)); + + if(desc.getPartitionSpec() != null){ + Optional<String> partitionName = partitionMap.keySet().stream().findFirst(); + partitionName.ifPresent(compactionRequest::setPartitionname); + } + List<CompactionResponse> compactionResponses = + initiatorBase.initiateCompactionForTable(compactionRequest, table.getTTable(), partitionMap); + for (CompactionResponse compactionResponse : compactionResponses) { + if (!compactionResponse.isAccepted()) { + String message; + if (compactionResponse.isSetErrormessage()) { + message = compactionResponse.getErrormessage(); + throw new HiveException(ErrorMsg.COMPACTION_REFUSED, table.getDbName(), table.getTableName(), + "CompactionId: " + compactionResponse.getId(), message); + } + context.getConsole().printInfo( + "Compaction already enqueued with id " + compactionResponse.getId() + "; State is " + + compactionResponse.getState()); + continue; + } + context.getConsole().printInfo("Compaction enqueued with id " + compactionResponse.getId()); + if (desc.isBlocking() && compactionResponse.isAccepted()) { + waitForCompactionToFinish(compactionResponse, context); + } + } return 0; } - private String getPartitionName(Table table) throws HiveException { - String partitionName = null; + private List<Partition> getPartitions(Table table, AlterTableCompactDesc desc, DDLOperationContext context) + throws HiveException { + List<Partition> partitions = new ArrayList<>(); + if (desc.getPartitionSpec() == null) { - if (table.isPartitioned()) { // Compaction can only be done on the whole table if the table is non-partitioned. - throw new HiveException(ErrorMsg.NO_COMPACTION_PARTITION); + if (table.isPartitioned()) { + // Compaction will get initiated for all the potential partitions that meets the criteria + partitions = context.getDb().getPartitions(table); } } else { Map<String, String> partitionSpec = desc.getPartitionSpec(); - List<Partition> partitions = context.getDb().getPartitions(table, partitionSpec); + partitions = context.getDb().getPartitions(table, partitionSpec); if (partitions.size() > 1) { throw new HiveException(ErrorMsg.TOO_MANY_COMPACTION_PARTITIONS); } else if (partitions.size() == 0) { throw new HiveException(ErrorMsg.INVALID_PARTITION_SPEC); } - partitionName = partitions.get(0).getName(); } - return partitionName; + return partitions; } - private CompactionResponse compact(Table table, String partitionName) throws HiveException { - CompactionRequest req = new CompactionRequest(table.getDbName(), table.getTableName(), - compactionTypeStr2ThriftType(desc.getCompactionType())); - req.setPartitionname(partitionName); - req.setPoolName(desc.getPoolName()); - req.setProperties(desc.getProperties()); - req.setInitiatorId(JavaUtils.hostname() + "-" + HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION); - req.setInitiatorVersion(HiveMetaStoreClient.class.getPackage().getImplementationVersion()); - req.setOrderByClause(desc.getOrderByClause()); - if (desc.getNumberOfBuckets() > 0) { - req.setNumberOfBuckets(desc.getNumberOfBuckets()); - } - CompactionResponse resp = context.getDb().compact(req); - if (resp.isAccepted()) { - context.getConsole().printInfo("Compaction enqueued with id " + resp.getId()); - } else { - context.getConsole().printInfo("Compaction already enqueued with id " + resp.getId() + "; State is " + - resp.getState()); - } - return resp; + private Map<String, org.apache.hadoop.hive.metastore.api.Partition> convertPartitionsFromThriftToDB( + List<Partition> partitions) { + Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap = new LinkedHashMap<>(); + partitions.forEach(partition -> partitionMap.put(partition.getName(), partition.getTPartition())); + return partitionMap; } - private void waitForCompactionToFinish(CompactionResponse resp) throws HiveException { + private void waitForCompactionToFinish(CompactionResponse resp, DDLOperationContext context) throws HiveException { StringBuilder progressDots = new StringBuilder(); long waitTimeMs = 1000; long waitTimeOut = HiveConf.getLongVar(context.getConf(), HiveConf.ConfVars.HIVE_COMPACTOR_WAIT_TIMEOUT); - wait: while (true) { + wait: + while (true) { //double wait time until 5min - waitTimeMs = waitTimeMs*2; + waitTimeMs = waitTimeMs * 2; waitTimeMs = Math.min(waitTimeMs, waitTimeOut); try { Thread.sleep(waitTimeMs); @@ -133,10 +148,11 @@ public class AlterTableCompactOperation extends DDLOperation<AlterTableCompactDe } ShowCompactRequest request = new ShowCompactRequest(); request.setId(resp.getId()); - + ShowCompactResponse compaction = context.getDb().showCompactions(request); if (compaction.getCompactsSize() == 1) { ShowCompactResponseElement comp = compaction.getCompacts().get(0); + LOG.debug("Response for cid: "+comp.getId()+" is "+comp.getState()); switch (comp.getState()) { case TxnStore.WORKING_RESPONSE: case TxnStore.INITIATED_RESPONSE: @@ -146,11 +162,11 @@ public class AlterTableCompactOperation extends DDLOperation<AlterTableCompactDe continue wait; default: //done - context.getConsole().printInfo("Compaction with id " + resp.getId() + " finished with status: " + - comp.getState()); + context.getConsole() + .printInfo("Compaction with id " + resp.getId() + " finished with status: " + comp.getState()); break wait; } - }else { + } else { throw new HiveException("No suitable compaction found"); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index a86c18baad9..bb48c8f219b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -20,33 +20,12 @@ package org.apache.hadoop.hive.ql.txn.compactor; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ServerUtils; -import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.CompactionRequest; -import org.apache.hadoop.hive.metastore.api.CompactionResponse; -import org.apache.hadoop.hive.metastore.api.CompactionType; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.metrics.AcidMetricService; -import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; import org.apache.hadoop.hive.metastore.metrics.PerfLogger; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; @@ -54,28 +33,14 @@ import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; -import org.apache.hadoop.hive.ql.io.AcidDirectory; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDirectory; -import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hive.common.util.Ref; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.LongSummaryStatistics; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ExecutorService; +import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -86,12 +51,10 @@ import static org.apache.hadoop.hive.conf.Constants.COMPACTOR_INTIATOR_THREAD_NA * A class to initiate compactions. This will run in a separate thread. * It's critical that there exactly 1 of these in a given warehouse. */ -public class Initiator extends MetaStoreCompactorThread { +public class Initiator extends InitiatorBase { static final private String CLASS_NAME = Initiator.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - static final private String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold."; - private ExecutorService compactionExecutor; private boolean metricsEnabled; @@ -178,7 +141,7 @@ public class Initiator extends MetaStoreCompactorThread { } Table t = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci)); - String poolName = getPoolName(ci, t); + ci.poolName = getPoolName(ci, t); Partition p = resolvePartition(ci); if (p == null && ci.partName != null) { LOG.info("Can't find partition " + ci.getFullPartitionName() + @@ -194,7 +157,7 @@ public class Initiator extends MetaStoreCompactorThread { CompletableFuture<Void> asyncJob = CompletableFuture.runAsync( CompactorUtil.ThrowingRunnable.unchecked(() -> - scheduleCompactionIfRequired(ci, t, p, poolName, runAs, metricsEnabled)), compactionExecutor) + scheduleCompactionIfRequired(ci, t, p, runAs, metricsEnabled)), compactionExecutor) .exceptionally(exc -> { LOG.error("Error while running scheduling the compaction on the table {} / partition {}.", tableName, partition, exc); return null; @@ -250,36 +213,6 @@ public class Initiator extends MetaStoreCompactorThread { MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLECACHE_ON); } - private void scheduleCompactionIfRequired(CompactionInfo ci, Table t, Partition p, String poolName, - String runAs, boolean metricsEnabled) - throws MetaException { - StorageDescriptor sd = resolveStorageDescriptor(t, p); - try { - ValidWriteIdList validWriteIds = resolveValidWriteIds(t); - - checkInterrupt(); - - CompactionType type = checkForCompaction(ci, validWriteIds, sd, t.getParameters(), runAs); - if (type != null) { - ci.type = type; - ci.poolName = poolName; - requestCompaction(ci, runAs); - } - } catch (InterruptedException e) { - //Handle InterruptedException separately so the compactioninfo won't be marked as failed. - LOG.info("Initiator pool is being shut down, task received interruption."); - } catch (Throwable ex) { - String errorMessage = "Caught exception while trying to determine if we should compact " + ci + ". Marking " - + "failed to avoid repeated failures, " + ex; - LOG.error(errorMessage); - ci.errorMessage = errorMessage; - if (metricsEnabled) { - Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER).inc(); - } - txnHandler.markFailed(ci); - } - } - private String getPoolName(CompactionInfo ci, Table t) throws Exception { Map<String, String> params = t.getParameters(); String poolName = params == null ? null : params.get(Constants.HIVE_COMPACTOR_WORKER_POOL); @@ -294,16 +227,7 @@ public class Initiator extends MetaStoreCompactorThread { return CompactorUtil.resolveDatabase(conf, ci.dbname); } - private ValidWriteIdList resolveValidWriteIds(Table t) throws NoSuchTxnException, MetaException { - ValidTxnList validTxnList = new ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY)); - // The response will have one entry per table and hence we get only one ValidWriteIdList - String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); - GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); - rqst.setValidTxnList(validTxnList.writeToString()); - return TxnUtils.createValidCompactWriteIdList( - txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0)); - } @VisibleForTesting protected String resolveUserToRunAs(Map<String, String> cache, Table t, Partition p) @@ -394,160 +318,6 @@ public class Initiator extends MetaStoreCompactorThread { return false; } - private CompactionType checkForCompaction(final CompactionInfo ci, - final ValidWriteIdList writeIds, - final StorageDescriptor sd, - final Map<String, String> tblproperties, - final String runAs) - throws IOException, InterruptedException { - // If it's marked as too many aborted, we already know we need to compact - if (ci.tooManyAborts) { - LOG.debug("Found too many aborted transactions for " + ci.getFullPartitionName() + ", " + - "initiating major compaction"); - return CompactionType.MAJOR; - } - - if (ci.hasOldAbort) { - HiveConf.ConfVars oldAbortedTimeoutProp = - HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD; - LOG.debug("Found an aborted transaction for " + ci.getFullPartitionName() - + " with age older than threshold " + oldAbortedTimeoutProp + ": " + conf - .getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. " - + "Initiating minor compaction."); - return CompactionType.MINOR; - } - AcidDirectory acidDirectory = getAcidDirectory(sd, writeIds); - long baseSize = getBaseSize(acidDirectory); - FileSystem fs = acidDirectory.getFs(); - Map<Path, Long> deltaSizes = new HashMap<>(); - for (AcidUtils.ParsedDelta delta : acidDirectory.getCurrentDirectories()) { - deltaSizes.put(delta.getPath(), getDirSize(fs, delta)); - } - long deltaSize = deltaSizes.values().stream().reduce(0L, Long::sum); - AcidMetricService.updateMetricsFromInitiator(ci.dbname, ci.tableName, ci.partName, conf, txnHandler, - baseSize, deltaSizes, acidDirectory.getObsolete()); - - if (runJobAsSelf(runAs)) { - return determineCompactionType(ci, acidDirectory, tblproperties, baseSize, deltaSize); - } else { - LOG.info("Going to initiate as user " + runAs + " for " + ci.getFullPartitionName()); - UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, - UserGroupInformation.getLoginUser()); - CompactionType compactionType; - try { - compactionType = ugi.doAs( - (PrivilegedExceptionAction<CompactionType>) () -> determineCompactionType(ci, acidDirectory, tblproperties, baseSize, deltaSize)); - } finally { - try { - FileSystem.closeAllForUGI(ugi); - } catch (IOException exception) { - LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + - ci.getFullPartitionName(), exception); - } - } - return compactionType; - } - } - - private AcidDirectory getAcidDirectory(StorageDescriptor sd, ValidWriteIdList writeIds) throws IOException { - Path location = new Path(sd.getLocation()); - FileSystem fs = location.getFileSystem(conf); - return AcidUtils.getAcidState(fs, location, conf, writeIds, Ref.from(false), false); - } - - private CompactionType determineCompactionType(CompactionInfo ci, AcidDirectory dir, Map<String, - String> tblproperties, long baseSize, long deltaSize) { - boolean noBase = false; - List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories(); - if (baseSize == 0 && deltaSize > 0) { - noBase = true; - } else { - String deltaPctProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX + - HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD); - float deltaPctThreshold = deltaPctProp == null ? - HiveConf.getFloatVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) : - Float.parseFloat(deltaPctProp); - boolean bigEnough = (float)deltaSize/(float)baseSize > deltaPctThreshold; - boolean multiBase = dir.getObsolete().stream() - .anyMatch(path -> path.getName().startsWith(AcidUtils.BASE_PREFIX)); - - boolean initiateMajor = bigEnough || (deltaSize == 0 && multiBase); - if (LOG.isDebugEnabled()) { - StringBuilder msg = new StringBuilder("delta size: "); - msg.append(deltaSize); - msg.append(" base size: "); - msg.append(baseSize); - msg.append(" multiBase "); - msg.append(multiBase); - msg.append(" deltaSize "); - msg.append(deltaSize); - msg.append(" threshold: "); - msg.append(deltaPctThreshold); - msg.append(" delta/base ratio > ").append(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD.varname) - .append(": "); - msg.append(bigEnough); - msg.append("."); - if (!initiateMajor) { - msg.append("not"); - } - msg.append(" initiating major compaction."); - LOG.debug(msg.toString()); - } - if (initiateMajor) return CompactionType.MAJOR; - } - - String deltaNumProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX + - HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD); - int deltaNumThreshold = deltaNumProp == null ? - HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) : - Integer.parseInt(deltaNumProp); - boolean enough = deltas.size() > deltaNumThreshold; - if (!enough) { - LOG.debug("Not enough deltas to initiate compaction for table=" + ci.tableName + "partition=" + ci.partName - + ". Found: " + deltas.size() + " deltas, threshold is " + deltaNumThreshold); - return null; - } - // If there's no base file, do a major compaction - LOG.debug("Found " + deltas.size() + " delta files, and " + (noBase ? "no" : "has") + " base," + - "requesting " + (noBase ? "major" : "minor") + " compaction"); - - return noBase || !isMinorCompactionSupported(tblproperties, dir) ? - CompactionType.MAJOR : CompactionType.MINOR; - } - - private long getBaseSize(AcidDirectory dir) throws IOException { - long baseSize = 0; - if (dir.getBase() != null) { - baseSize = getDirSize(dir.getFs(), dir.getBase()); - } else { - for (HdfsFileStatusWithId origStat : dir.getOriginalFiles()) { - baseSize += origStat.getFileStatus().getLen(); - } - } - return baseSize; - } - - private long getDirSize(FileSystem fs, ParsedDirectory dir) throws IOException { - return dir.getFiles(fs, Ref.from(false)).stream() - .map(HdfsFileStatusWithId::getFileStatus) - .mapToLong(FileStatus::getLen) - .sum(); - } - - private void requestCompaction(CompactionInfo ci, String runAs) throws MetaException { - CompactionRequest rqst = new CompactionRequest(ci.dbname, ci.tableName, ci.type); - if (ci.partName != null) rqst.setPartitionname(ci.partName); - rqst.setRunas(runAs); - rqst.setInitiatorId(getInitiatorId(Thread.currentThread().getId())); - rqst.setInitiatorVersion(this.runtimeVersion); - rqst.setPoolName(ci.poolName); - LOG.info("Requesting compaction: " + rqst); - CompactionResponse resp = txnHandler.compact(rqst); - if(resp.isAccepted()) { - ci.id = resp.getId(); - } - } - // Check if it's a dynamic partitioning case. If so, do not initiate compaction for streaming ingest, only for aborts. private static boolean isDynPartIngest(Table t, CompactionInfo ci){ if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 && @@ -637,13 +407,6 @@ public class Initiator extends MetaStoreCompactorThread { return true; } - private String getInitiatorId(long threadId) { - StringBuilder name = new StringBuilder(this.hostName); - name.append("-"); - name.append(threadId); - return name.toString(); - } - private static class InitiatorCycleUpdater implements Runnable { private final String metric; private final long startedAt; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java new file mode 100644 index 00000000000..14dd2ebffe0 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionResponse; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.metrics.AcidMetricService; +import org.apache.hadoop.hive.metastore.metrics.Metrics; +import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.apache.hadoop.hive.ql.io.AcidDirectory; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.common.util.Ref; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class InitiatorBase extends MetaStoreCompactorThread { + + static final private String COMPACTOR_THRESHOLD_PREFIX = "compactorthreshold."; + + private List<CompactionResponse> initiateCompactionForMultiplePartitions(Table table, + Map<String, Partition> partitions, CompactionRequest request) { + List<CompactionResponse> compactionResponses = new ArrayList<>(); + partitions.entrySet().parallelStream().forEach(entry -> { + try { + StorageDescriptor sd = resolveStorageDescriptor(table, entry.getValue()); + String runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf); + CompactionInfo ci = + new CompactionInfo(table.getDbName(), table.getTableName(), entry.getKey(), request.getType()); + ci.initiatorId = request.getInitiatorId(); + ci.orderByClause = request.getOrderByClause(); + ci.initiatorVersion = request.getInitiatorVersion(); + if (request.getNumberOfBuckets() > 0) { + ci.numberOfBuckets = request.getNumberOfBuckets(); + } + ci.poolName = request.getPoolName(); + LOG.info( + "Checking to see if we should compact partition " + entry.getKey() + " of table " + table.getDbName() + "." + + table.getTableName()); + CollectionUtils.addIgnoreNull(compactionResponses, + scheduleCompactionIfRequired(ci, table, entry.getValue(), runAs, false)); + } catch (IOException | InterruptedException | MetaException e) { + LOG.error( + "Error occurred while Checking if we should compact partition " + entry.getKey() + " of table " + table.getDbName() + "." + + table.getTableName() + " Exception: " + e.getMessage()); + throw new RuntimeException(e); + } + }); + return compactionResponses; + } + + public List<CompactionResponse> initiateCompactionForTable(CompactionRequest request, Table table, Map<String, Partition> partitions) throws Exception { + ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0); + conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString()); + + if (request.getPartitionname()!= null || partitions.isEmpty()) { + List<CompactionResponse> responses = new ArrayList<>(); + responses.add(txnHandler.compact(request)); + return responses; + } else { + return initiateCompactionForMultiplePartitions(table, partitions, request); + } + } + + @Override protected boolean isCacheEnabled() { + return false; + } + + private String getInitiatorId(long threadId) { + return this.hostName + "-" + threadId; + } + + private CompactionResponse requestCompaction(CompactionInfo ci, String runAs) throws MetaException { + CompactionRequest compactionRequest = new CompactionRequest(ci.dbname, ci.tableName, ci.type); + if (ci.partName != null) + compactionRequest.setPartitionname(ci.partName); + compactionRequest.setRunas(runAs); + if (StringUtils.isEmpty(ci.initiatorId)) { + compactionRequest.setInitiatorId(getInitiatorId(Thread.currentThread().getId())); + } else { + compactionRequest.setInitiatorId(ci.initiatorId); + } + compactionRequest.setInitiatorVersion(this.runtimeVersion); + compactionRequest.setPoolName(ci.poolName); + LOG.info("Requesting compaction: " + compactionRequest); + CompactionResponse resp = txnHandler.compact(compactionRequest); + if (resp.isAccepted()) { + ci.id = resp.getId(); + } + return resp; + } + + private AcidDirectory getAcidDirectory(StorageDescriptor sd, ValidWriteIdList writeIds) throws IOException { + Path location = new Path(sd.getLocation()); + FileSystem fs = location.getFileSystem(conf); + return AcidUtils.getAcidState(fs, location, conf, writeIds, Ref.from(false), false); + } + + private CompactionType determineCompactionType(CompactionInfo ci, AcidDirectory dir, + Map<String, String> tblProperties, long baseSize, long deltaSize) { + boolean noBase = false; + List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories(); + if (baseSize == 0 && deltaSize > 0) { + noBase = true; + } else { + String deltaPctProp = + tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD); + float deltaPctThreshold = deltaPctProp == null ? HiveConf.getFloatVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) : Float.parseFloat(deltaPctProp); + boolean bigEnough = (float) deltaSize / (float) baseSize > deltaPctThreshold; + boolean multiBase = dir.getObsolete().stream().anyMatch(path -> path.getName().startsWith(AcidUtils.BASE_PREFIX)); + + boolean initiateMajor = bigEnough || (deltaSize == 0 && multiBase); + if (LOG.isDebugEnabled()) { + StringBuilder msg = new StringBuilder("delta size: "); + msg.append(deltaSize); + msg.append(" base size: "); + msg.append(baseSize); + msg.append(" multiBase "); + msg.append(multiBase); + msg.append(" deltaSize "); + msg.append(deltaSize); + msg.append(" threshold: "); + msg.append(deltaPctThreshold); + msg.append(" delta/base ratio > ").append(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD.varname) + .append(": "); + msg.append(bigEnough); + msg.append("."); + if (!initiateMajor) { + msg.append("not"); + } + msg.append(" initiating major compaction."); + LOG.debug(msg.toString()); + } + if (initiateMajor) + return CompactionType.MAJOR; + } + + String deltaNumProp = + tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD); + int deltaNumThreshold = deltaNumProp == null ? HiveConf.getIntVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) : Integer.parseInt(deltaNumProp); + boolean enough = deltas.size() > deltaNumThreshold; + if (!enough) { + LOG.debug("Not enough deltas to initiate compaction for table=" + ci.tableName + "partition=" + ci.partName + + ". Found: " + deltas.size() + " deltas, threshold is " + deltaNumThreshold); + return null; + } + // If there's no base file, do a major compaction + LOG.debug("Found " + deltas.size() + " delta files, and " + (noBase ? "no" : "has") + " base," + "requesting " + + (noBase ? "major" : "minor") + " compaction"); + + return noBase || !isMinorCompactionSupported(tblProperties, dir) ? CompactionType.MAJOR : CompactionType.MINOR; + } + + private long getBaseSize(AcidDirectory dir) throws IOException { + long baseSize = 0; + if (dir.getBase() != null) { + baseSize = getDirSize(dir.getFs(), dir.getBase()); + } else { + for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles()) { + baseSize += origStat.getFileStatus().getLen(); + } + } + return baseSize; + } + + private long getDirSize(FileSystem fs, AcidUtils.ParsedDirectory dir) throws IOException { + return dir.getFiles(fs, Ref.from(false)).stream().map(HadoopShims.HdfsFileStatusWithId::getFileStatus) + .mapToLong(FileStatus::getLen).sum(); + } + + private CompactionType checkForCompaction(final CompactionInfo ci, final ValidWriteIdList writeIds, + final StorageDescriptor sd, final Map<String, String> tblProperties, final String runAs) + throws IOException, InterruptedException { + // If it's marked as too many aborted, we already know we need to compact + if (ci.tooManyAborts) { + LOG.debug("Found too many aborted transactions for " + ci.getFullPartitionName() + ", " + + "initiating major compaction"); + return CompactionType.MAJOR; + } + + if (ci.hasOldAbort) { + HiveConf.ConfVars oldAbortedTimeoutProp = HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD; + LOG.debug("Found an aborted transaction for " + ci.getFullPartitionName() + " with age older than threshold " + + oldAbortedTimeoutProp + ": " + conf.getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. " + + "Initiating minor compaction."); + return CompactionType.MINOR; + } + AcidDirectory acidDirectory = getAcidDirectory(sd, writeIds); + long baseSize = getBaseSize(acidDirectory); + FileSystem fs = acidDirectory.getFs(); + Map<Path, Long> deltaSizes = new HashMap<>(); + for (AcidUtils.ParsedDelta delta : acidDirectory.getCurrentDirectories()) { + deltaSizes.put(delta.getPath(), getDirSize(fs, delta)); + } + long deltaSize = deltaSizes.values().stream().reduce(0L, Long::sum); + AcidMetricService.updateMetricsFromInitiator(ci.dbname, ci.tableName, ci.partName, conf, txnHandler, baseSize, + deltaSizes, acidDirectory.getObsolete()); + + if (runJobAsSelf(runAs)) { + return determineCompactionType(ci, acidDirectory, tblProperties, baseSize, deltaSize); + } else { + LOG.info("Going to initiate as user " + runAs + " for " + ci.getFullPartitionName()); + UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, UserGroupInformation.getLoginUser()); + CompactionType compactionType; + try { + compactionType = ugi.doAs( + (PrivilegedExceptionAction<CompactionType>) () -> determineCompactionType(ci, acidDirectory, tblProperties, + baseSize, deltaSize)); + } finally { + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException exception) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + ci.getFullPartitionName(), + exception); + } + } + return compactionType; + } + } + + private ValidWriteIdList resolveValidWriteIds(Table t) + throws NoSuchTxnException, MetaException { + ValidTxnList validTxnList = new ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY)); + // The response will have one entry per table and hence we get only one ValidWriteIdList + String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName()); + GetValidWriteIdsRequest validWriteIdsRequest = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName)); + validWriteIdsRequest.setValidTxnList(validTxnList.writeToString()); + + return TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(validWriteIdsRequest).getTblValidWriteIds().get(0)); + } + + protected CompactionResponse scheduleCompactionIfRequired(CompactionInfo ci, Table t, + Partition p, String runAs, boolean metricsEnabled) + throws MetaException { + StorageDescriptor sd = resolveStorageDescriptor(t, p); + try { + ValidWriteIdList validWriteIds = resolveValidWriteIds(t); + + checkInterrupt(); + + CompactionType type = checkForCompaction(ci, validWriteIds, sd, t.getParameters(), runAs); + if (type != null) { + ci.type = type; + return requestCompaction(ci, runAs); + } + } catch (InterruptedException e) { + //Handle InterruptedException separately so the compactionInfo won't be marked as failed. + LOG.info("Initiator pool is being shut down, task received interruption."); + } catch (Throwable ex) { + String errorMessage = "Caught exception while trying to determine if we should compact " + ci + ". Marking " + + "failed to avoid repeated failures, " + ex; + LOG.error(errorMessage); + ci.errorMessage = errorMessage; + if (metricsEnabled) { + Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER).inc(); + } + txnHandler.markFailed(ci); + } + return null; + } + +} diff --git a/ql/src/test/queries/clientpositive/manual_compaction.q b/ql/src/test/queries/clientpositive/manual_compaction.q new file mode 100644 index 00000000000..a794ff7555a --- /dev/null +++ b/ql/src/test/queries/clientpositive/manual_compaction.q @@ -0,0 +1,68 @@ +-- Mask the enqueue time which is based on current time +--! qt:replace:/(initiated\s+---\s+---\s+)[0-9]*(\s+---)/$1#Masked#$2/ +--! qt:replace:/(---\s+)[a-zA-Z0-9\-]+(\s+manual)/$1#Masked#$2/ +-- Mask the hostname in show compaction +--! qt:replace:/(---\s+)[\S]*(\s+manual)/$1#Masked#$2/ +-- Mask compaction id as they will be allocated in parallel threads +--! qt:replace:/^[0-9]/#Masked#/ + +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +create table UN_PARTITIONED_T(key string, val string) clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true'); + +create table UN_PARTITIONED_T_MINOR(key string, val string) clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true'); + +create table PARTITIONED_T(key string, val string) partitioned by (dt string) clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true'); + +alter table UN_PARTITIONED_T compact 'major'; + +alter table UN_PARTITIONED_T_MINOR compact 'minor'; + +alter table PARTITIONED_T add partition(dt='2023'); + +insert into PARTITIONED_T partition(dt='2023') values ('k1','v1'); +insert into PARTITIONED_T partition(dt='2023') values ('k2','v2'); +insert into PARTITIONED_T partition(dt='2023') values ('k3','v3'); + +alter table PARTITIONED_T partition(dt='2023') compact 'minor'; + +SHOW COMPACTIONS ORDER BY 'PARTITION' DESC; + +alter table PARTITIONED_T add partition(dt='2024'); + +insert into PARTITIONED_T partition(dt='2024') values ('k1','v1'); +insert into PARTITIONED_T partition(dt='2024') values ('k2','v2'); +insert into PARTITIONED_T partition(dt='2024') values ('k3','v3'); +insert into PARTITIONED_T partition(dt='2024') values ('k4','v4'); +insert into PARTITIONED_T partition(dt='2024') values ('k5','v5'); +insert into PARTITIONED_T partition(dt='2024') values ('k6','v6'); +insert into PARTITIONED_T partition(dt='2024') values ('k7','v7'); +insert into PARTITIONED_T partition(dt='2024') values ('k8','v8'); +insert into PARTITIONED_T partition(dt='2024') values ('k9','v9'); +insert into PARTITIONED_T partition(dt='2024') values ('k10','v10'); +insert into PARTITIONED_T partition(dt='2024') values ('k11','v11'); + +insert into PARTITIONED_T partition(dt='2022') values ('k1','v1'); +insert into PARTITIONED_T partition(dt='2022') values ('k2','v2'); +insert into PARTITIONED_T partition(dt='2022') values ('k3','v3'); +insert into PARTITIONED_T partition(dt='2022') values ('k4','v4'); +insert into PARTITIONED_T partition(dt='2022') values ('k5','v5'); +insert into PARTITIONED_T partition(dt='2022') values ('k6','v6'); +insert into PARTITIONED_T partition(dt='2022') values ('k7','v7'); +insert into PARTITIONED_T partition(dt='2022') values ('k8','v8'); +insert into PARTITIONED_T partition(dt='2022') values ('k9','v9'); +insert into PARTITIONED_T partition(dt='2022') values ('k10','v10'); +insert into PARTITIONED_T partition(dt='2022') values ('k11','v11'); + +explain alter table PARTITIONED_T compact 'major'; + +alter table PARTITIONED_T compact 'major'; + +SHOW COMPACTIONS ORDER BY 'PARTITION' DESC; + +drop table UN_PARTITIONED_T; + +drop table UN_PARTITIONED_T_MINOR; + +drop table PARTITIONED_T; diff --git a/ql/src/test/results/clientpositive/llap/manual_compaction.q.out b/ql/src/test/results/clientpositive/llap/manual_compaction.q.out new file mode 100644 index 00000000000..90dddd709e1 --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/manual_compaction.q.out @@ -0,0 +1,387 @@ +PREHOOK: query: create table UN_PARTITIONED_T(key string, val string) clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@UN_PARTITIONED_T +POSTHOOK: query: create table UN_PARTITIONED_T(key string, val string) clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@UN_PARTITIONED_T +PREHOOK: query: create table UN_PARTITIONED_T_MINOR(key string, val string) clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@UN_PARTITIONED_T_MINOR +POSTHOOK: query: create table UN_PARTITIONED_T_MINOR(key string, val string) clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@UN_PARTITIONED_T_MINOR +PREHOOK: query: create table PARTITIONED_T(key string, val string) partitioned by (dt string) clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@PARTITIONED_T +POSTHOOK: query: create table PARTITIONED_T(key string, val string) partitioned by (dt string) clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@PARTITIONED_T +PREHOOK: query: alter table UN_PARTITIONED_T compact 'major' +PREHOOK: type: ALTERTABLE_COMPACT +PREHOOK: Input: default@un_partitioned_t +PREHOOK: Output: default@un_partitioned_t +POSTHOOK: query: alter table UN_PARTITIONED_T compact 'major' +POSTHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: Input: default@un_partitioned_t +POSTHOOK: Output: default@un_partitioned_t +PREHOOK: query: alter table UN_PARTITIONED_T_MINOR compact 'minor' +PREHOOK: type: ALTERTABLE_COMPACT +PREHOOK: Input: default@un_partitioned_t_minor +PREHOOK: Output: default@un_partitioned_t_minor +POSTHOOK: query: alter table UN_PARTITIONED_T_MINOR compact 'minor' +POSTHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: Input: default@un_partitioned_t_minor +POSTHOOK: Output: default@un_partitioned_t_minor +PREHOOK: query: alter table PARTITIONED_T add partition(dt='2023') +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@partitioned_t +POSTHOOK: query: alter table PARTITIONED_T add partition(dt='2023') +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@partitioned_t +POSTHOOK: Output: default@partitioned_t@dt=2023 +PREHOOK: query: insert into PARTITIONED_T partition(dt='2023') values ('k1','v1') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2023 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2023') values ('k1','v1') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2023 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2023') values ('k2','v2') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2023 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2023') values ('k2','v2') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2023 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2023') values ('k3','v3') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2023 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2023') values ('k3','v3') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2023 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).val SCRIPT [] +PREHOOK: query: alter table PARTITIONED_T partition(dt='2023') compact 'minor' +PREHOOK: type: ALTERTABLE_COMPACT +PREHOOK: Input: default@partitioned_t +PREHOOK: Output: default@partitioned_t@dt=2023 +POSTHOOK: query: alter table PARTITIONED_T partition(dt='2023') compact 'minor' +POSTHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: Input: default@partitioned_t +POSTHOOK: Output: default@partitioned_t@dt=2023 +PREHOOK: query: SHOW COMPACTIONS ORDER BY 'PARTITION' DESC +PREHOOK: type: SHOW COMPACTIONS +POSTHOOK: query: SHOW COMPACTIONS ORDER BY 'PARTITION' DESC +POSTHOOK: type: SHOW COMPACTIONS +CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId +#Masked# default un_partitioned_t_minor --- MINOR initiated --- --- #Masked# --- --- --- --- #Masked# manual default 0 0 0 --- +#Masked# default un_partitioned_t --- MAJOR initiated --- --- #Masked# --- --- --- --- #Masked# manual default 0 0 0 --- +#Masked# default partitioned_t dt=2023 MINOR initiated --- --- #Masked# --- --- --- --- #Masked# manual default 0 0 0 --- +PREHOOK: query: alter table PARTITIONED_T add partition(dt='2024') +PREHOOK: type: ALTERTABLE_ADDPARTS +PREHOOK: Output: default@partitioned_t +POSTHOOK: query: alter table PARTITIONED_T add partition(dt='2024') +POSTHOOK: type: ALTERTABLE_ADDPARTS +POSTHOOK: Output: default@partitioned_t +POSTHOOK: Output: default@partitioned_t@dt=2024 +PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k1','v1') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k1','v1') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k2','v2') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k2','v2') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k3','v3') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k3','v3') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k4','v4') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k4','v4') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k5','v5') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k5','v5') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k6','v6') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k6','v6') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k7','v7') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k7','v7') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k8','v8') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k8','v8') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k9','v9') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k9','v9') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k10','v10') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k10','v10') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k11','v11') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values ('k11','v11') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2024 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k1','v1') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k1','v1') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k2','v2') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k2','v2') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k3','v3') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k3','v3') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k4','v4') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k4','v4') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k5','v5') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k5','v5') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k6','v6') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k6','v6') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k7','v7') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k7','v7') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k8','v8') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k8','v8') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k9','v9') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k9','v9') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k10','v10') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k10','v10') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT [] +PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k11','v11') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values ('k11','v11') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@partitioned_t@dt=2022 +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT [] +POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT [] +PREHOOK: query: explain alter table PARTITIONED_T compact 'major' +PREHOOK: type: ALTERTABLE_COMPACT +PREHOOK: Input: default@partitioned_t +PREHOOK: Output: default@partitioned_t +POSTHOOK: query: explain alter table PARTITIONED_T compact 'major' +POSTHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: Input: default@partitioned_t +POSTHOOK: Output: default@partitioned_t +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Compact + compaction type: major + table name: default.PARTITIONED_T + numberOfBuckets: 0 + table name: default.PARTITIONED_T + +PREHOOK: query: alter table PARTITIONED_T compact 'major' +PREHOOK: type: ALTERTABLE_COMPACT +PREHOOK: Input: default@partitioned_t +PREHOOK: Output: default@partitioned_t +POSTHOOK: query: alter table PARTITIONED_T compact 'major' +POSTHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: Input: default@partitioned_t +POSTHOOK: Output: default@partitioned_t +PREHOOK: query: SHOW COMPACTIONS ORDER BY 'PARTITION' DESC +PREHOOK: type: SHOW COMPACTIONS +POSTHOOK: query: SHOW COMPACTIONS ORDER BY 'PARTITION' DESC +POSTHOOK: type: SHOW COMPACTIONS +CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId +#Masked# default un_partitioned_t_minor --- MINOR initiated --- --- #Masked# --- --- --- --- #Masked# manual default 0 0 0 --- +#Masked# default un_partitioned_t --- MAJOR initiated --- --- #Masked# --- --- --- --- #Masked# manual default 0 0 0 --- +#Masked# default partitioned_t dt=2024 MAJOR initiated --- --- #Masked# --- --- --- --- #Masked# manual default 0 0 0 --- +#Masked# default partitioned_t dt=2023 MINOR initiated --- --- #Masked# --- --- --- --- #Masked# manual default 0 0 0 --- +#Masked# default partitioned_t dt=2022 MAJOR initiated --- --- #Masked# --- --- --- --- #Masked# manual default 0 0 0 --- +PREHOOK: query: drop table UN_PARTITIONED_T +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@un_partitioned_t +PREHOOK: Output: database:default +PREHOOK: Output: default@un_partitioned_t +POSTHOOK: query: drop table UN_PARTITIONED_T +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@un_partitioned_t +POSTHOOK: Output: database:default +POSTHOOK: Output: default@un_partitioned_t +PREHOOK: query: drop table UN_PARTITIONED_T_MINOR +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@un_partitioned_t_minor +PREHOOK: Output: database:default +PREHOOK: Output: default@un_partitioned_t_minor +POSTHOOK: query: drop table UN_PARTITIONED_T_MINOR +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@un_partitioned_t_minor +POSTHOOK: Output: database:default +POSTHOOK: Output: default@un_partitioned_t_minor +PREHOOK: query: drop table PARTITIONED_T +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@partitioned_t +PREHOOK: Output: database:default +PREHOOK: Output: default@partitioned_t +POSTHOOK: query: drop table PARTITIONED_T +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@partitioned_t +POSTHOOK: Output: database:default +POSTHOOK: Output: default@partitioned_t