This is an automated email from the ASF dual-hosted git repository.

veghlaci05 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new d2d4b86433e HIVE-27598 - Enhance alter table compact to work for 
partitioned tables (Taraka Rama Rao Lethavadla, reviewed by Laszlo Vegh)
d2d4b86433e is described below

commit d2d4b86433ef356bf738edd72bfef74511a15444
Author: tarak271 <tarakaramarao.lethava...@gmail.com>
AuthorDate: Tue Oct 31 15:08:20 2023 +0530

    HIVE-27598 - Enhance alter table compact to work for partitioned tables 
(Taraka Rama Rao Lethavadla, reviewed by Laszlo Vegh)
---
 .../compact/AlterTableCompactOperation.java        | 130 ++++---
 .../hadoop/hive/ql/txn/compactor/Initiator.java    | 249 +------------
 .../hive/ql/txn/compactor/InitiatorBase.java       | 309 ++++++++++++++++
 .../queries/clientpositive/manual_compaction.q     |  68 ++++
 .../clientpositive/llap/manual_compaction.q.out    | 387 +++++++++++++++++++++
 5 files changed, 843 insertions(+), 300 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
index 9187101367b..cc896331aff 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
@@ -18,27 +18,27 @@
 
 package org.apache.hadoop.hive.ql.ddl.table.storage.compact;
 
-import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.CompactionRequest;
-import org.apache.hadoop.hive.metastore.utils.JavaUtils;
-import org.apache.hadoop.hive.ql.ddl.DDLOperationContext;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
-
-import java.util.List;
-import java.util.Map;
 
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionResponse;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.utils.JavaUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.ddl.DDLOperation;
+import org.apache.hadoop.hive.ql.ddl.DDLOperationContext;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.txn.compactor.InitiatorBase;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.hadoop.hive.ql.io.AcidUtils.compactionTypeStr2ThriftType;
 
@@ -50,80 +50,95 @@ public class AlterTableCompactOperation extends 
DDLOperation<AlterTableCompactDe
     super(context, desc);
   }
 
-  @Override
-  public int execute() throws HiveException {
+  @Override public int execute() throws Exception {
     Table table = context.getDb().getTable(desc.getTableName());
     if (!AcidUtils.isTransactionalTable(table)) {
       throw new HiveException(ErrorMsg.NONACID_COMPACTION_NOT_SUPPORTED, 
table.getDbName(), table.getTableName());
     }
 
-    String partitionName = getPartitionName(table);
+    CompactionRequest compactionRequest = new 
CompactionRequest(table.getDbName(), table.getTableName(),
+        compactionTypeStr2ThriftType(desc.getCompactionType()));
 
-    CompactionResponse resp = compact(table, partitionName);
-    if (!resp.isAccepted()) {
-      String message = Constants.ERROR_MESSAGE_NO_DETAILS_AVAILABLE;
-      if (resp.isSetErrormessage()) {
-        message = resp.getErrormessage();
-      }
-      throw new HiveException(ErrorMsg.COMPACTION_REFUSED,
-          table.getDbName(), table.getTableName(), partitionName == null ? "" 
: "(partition=" + partitionName + ")", message);
-    }
+    compactionRequest.setPoolName(desc.getPoolName());
+    compactionRequest.setProperties(desc.getProperties());
+    compactionRequest.setInitiatorId(JavaUtils.hostname() + "-" + 
HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION);
+    
compactionRequest.setInitiatorVersion(HiveMetaStoreClient.class.getPackage().getImplementationVersion());
+    compactionRequest.setOrderByClause(desc.getOrderByClause());
 
-    if (desc.isBlocking() && resp.isAccepted()) {
-      waitForCompactionToFinish(resp);
+    if (desc.getNumberOfBuckets() > 0) {
+      compactionRequest.setNumberOfBuckets(desc.getNumberOfBuckets());
     }
 
+    InitiatorBase initiatorBase = new InitiatorBase();
+    initiatorBase.setConf(context.getConf());
+    initiatorBase.init(new AtomicBoolean());
+
+    Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap =
+        convertPartitionsFromThriftToDB(getPartitions(table, desc, context));
+
+    if(desc.getPartitionSpec() != null){
+      Optional<String> partitionName =  
partitionMap.keySet().stream().findFirst();
+      partitionName.ifPresent(compactionRequest::setPartitionname);
+    }
+    List<CompactionResponse> compactionResponses =
+        initiatorBase.initiateCompactionForTable(compactionRequest, 
table.getTTable(), partitionMap);
+    for (CompactionResponse compactionResponse : compactionResponses) {
+      if (!compactionResponse.isAccepted()) {
+        String message;
+        if (compactionResponse.isSetErrormessage()) {
+          message = compactionResponse.getErrormessage();
+          throw new HiveException(ErrorMsg.COMPACTION_REFUSED, 
table.getDbName(), table.getTableName(),
+              "CompactionId: " + compactionResponse.getId(), message);
+        }
+        context.getConsole().printInfo(
+            "Compaction already enqueued with id " + 
compactionResponse.getId() + "; State is "
+                + compactionResponse.getState());
+        continue;
+      }
+      context.getConsole().printInfo("Compaction enqueued with id " + 
compactionResponse.getId());
+      if (desc.isBlocking() && compactionResponse.isAccepted()) {
+        waitForCompactionToFinish(compactionResponse, context);
+      }
+    }
     return 0;
   }
 
-  private String getPartitionName(Table table) throws HiveException {
-    String partitionName = null;
+  private List<Partition> getPartitions(Table table, AlterTableCompactDesc 
desc, DDLOperationContext context)
+      throws HiveException {
+    List<Partition> partitions = new ArrayList<>();
+
     if (desc.getPartitionSpec() == null) {
-      if (table.isPartitioned()) { // Compaction can only be done on the whole 
table if the table is non-partitioned.
-        throw new HiveException(ErrorMsg.NO_COMPACTION_PARTITION);
+      if (table.isPartitioned()) {
+        // Compaction will get initiated for all the potential partitions that 
meets the criteria
+        partitions = context.getDb().getPartitions(table);
       }
     } else {
       Map<String, String> partitionSpec = desc.getPartitionSpec();
-      List<Partition> partitions = context.getDb().getPartitions(table, 
partitionSpec);
+      partitions = context.getDb().getPartitions(table, partitionSpec);
       if (partitions.size() > 1) {
         throw new HiveException(ErrorMsg.TOO_MANY_COMPACTION_PARTITIONS);
       } else if (partitions.size() == 0) {
         throw new HiveException(ErrorMsg.INVALID_PARTITION_SPEC);
       }
-      partitionName = partitions.get(0).getName();
     }
-    return partitionName;
+    return partitions;
   }
 
-  private CompactionResponse compact(Table table, String partitionName) throws 
HiveException {
-    CompactionRequest req = new CompactionRequest(table.getDbName(), 
table.getTableName(),
-        compactionTypeStr2ThriftType(desc.getCompactionType()));
-    req.setPartitionname(partitionName);
-    req.setPoolName(desc.getPoolName());
-    req.setProperties(desc.getProperties());
-    req.setInitiatorId(JavaUtils.hostname() + "-" + 
HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION);
-    
req.setInitiatorVersion(HiveMetaStoreClient.class.getPackage().getImplementationVersion());
-    req.setOrderByClause(desc.getOrderByClause());
-    if (desc.getNumberOfBuckets() > 0) {
-      req.setNumberOfBuckets(desc.getNumberOfBuckets());
-    }
-    CompactionResponse resp = context.getDb().compact(req);
-    if (resp.isAccepted()) {
-      context.getConsole().printInfo("Compaction enqueued with id " + 
resp.getId());
-    } else {
-      context.getConsole().printInfo("Compaction already enqueued with id " + 
resp.getId() + "; State is " +
-          resp.getState());
-    }
-    return resp;
+  private Map<String, org.apache.hadoop.hive.metastore.api.Partition> 
convertPartitionsFromThriftToDB(
+      List<Partition> partitions) {
+    Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap = 
new LinkedHashMap<>();
+    partitions.forEach(partition -> partitionMap.put(partition.getName(), 
partition.getTPartition()));
+    return partitionMap;
   }
 
-  private void waitForCompactionToFinish(CompactionResponse resp) throws 
HiveException {
+  private void waitForCompactionToFinish(CompactionResponse resp, 
DDLOperationContext context) throws HiveException {
     StringBuilder progressDots = new StringBuilder();
     long waitTimeMs = 1000;
     long waitTimeOut = HiveConf.getLongVar(context.getConf(), 
HiveConf.ConfVars.HIVE_COMPACTOR_WAIT_TIMEOUT);
-    wait: while (true) {
+    wait:
+    while (true) {
       //double wait time until 5min
-      waitTimeMs = waitTimeMs*2;
+      waitTimeMs = waitTimeMs * 2;
       waitTimeMs = Math.min(waitTimeMs, waitTimeOut);
       try {
         Thread.sleep(waitTimeMs);
@@ -133,10 +148,11 @@ public class AlterTableCompactOperation extends 
DDLOperation<AlterTableCompactDe
       }
       ShowCompactRequest request = new ShowCompactRequest();
       request.setId(resp.getId());
-      
+
       ShowCompactResponse compaction = 
context.getDb().showCompactions(request);
       if (compaction.getCompactsSize() == 1) {
         ShowCompactResponseElement comp = compaction.getCompacts().get(0);
+        LOG.debug("Response for cid: "+comp.getId()+" is "+comp.getState());
         switch (comp.getState()) {
           case TxnStore.WORKING_RESPONSE:
           case TxnStore.INITIATED_RESPONSE:
@@ -146,11 +162,11 @@ public class AlterTableCompactOperation extends 
DDLOperation<AlterTableCompactDe
             continue wait;
           default:
             //done
-            context.getConsole().printInfo("Compaction with id " + 
resp.getId() + " finished with status: " +
-                comp.getState());
+            context.getConsole()
+                .printInfo("Compaction with id " + resp.getId() + " finished 
with status: " + comp.getState());
             break wait;
         }
-      }else {
+      } else {
         throw new HiveException("No suitable compaction found");
       }
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index a86c18baad9..bb48c8f219b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -20,33 +20,12 @@ package org.apache.hadoop.hive.ql.txn.compactor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ServerUtils;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.CompactionRequest;
-import org.apache.hadoop.hive.metastore.api.CompactionResponse;
-import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
-import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
 import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
@@ -54,28 +33,14 @@ import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.ql.io.AcidDirectory;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDirectory;
-import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.common.util.Ref;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.LongSummaryStatistics;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -86,12 +51,10 @@ import static 
org.apache.hadoop.hive.conf.Constants.COMPACTOR_INTIATOR_THREAD_NA
  * A class to initiate compactions.  This will run in a separate thread.
  * It's critical that there exactly 1 of these in a given warehouse.
  */
-public class Initiator extends MetaStoreCompactorThread {
+public class Initiator extends InitiatorBase {
   static final private String CLASS_NAME = Initiator.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
-  static final private String COMPACTORTHRESHOLD_PREFIX = 
"compactorthreshold.";
-
   private ExecutorService compactionExecutor;
 
   private boolean metricsEnabled;
@@ -178,7 +141,7 @@ public class Initiator extends MetaStoreCompactorThread {
               }
 
               Table t = metadataCache.computeIfAbsent(ci.getFullTableName(), 
() -> resolveTable(ci));
-              String poolName = getPoolName(ci, t);
+              ci.poolName = getPoolName(ci, t);
               Partition p = resolvePartition(ci);
               if (p == null && ci.partName != null) {
                 LOG.info("Can't find partition " + ci.getFullPartitionName() +
@@ -194,7 +157,7 @@ public class Initiator extends MetaStoreCompactorThread {
               CompletableFuture<Void> asyncJob =
                   CompletableFuture.runAsync(
                           CompactorUtil.ThrowingRunnable.unchecked(() ->
-                              scheduleCompactionIfRequired(ci, t, p, poolName, 
runAs, metricsEnabled)), compactionExecutor)
+                              scheduleCompactionIfRequired(ci, t, p, runAs, 
metricsEnabled)), compactionExecutor)
                       .exceptionally(exc -> {
                         LOG.error("Error while running scheduling the 
compaction on the table {} / partition {}.", tableName, partition, exc);
                         return null;
@@ -250,36 +213,6 @@ public class Initiator extends MetaStoreCompactorThread {
             MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLECACHE_ON);
   }
 
-  private void scheduleCompactionIfRequired(CompactionInfo ci, Table t, 
Partition p, String poolName,
-                                            String runAs, boolean 
metricsEnabled)
-      throws MetaException {
-    StorageDescriptor sd = resolveStorageDescriptor(t, p);
-    try {
-      ValidWriteIdList validWriteIds = resolveValidWriteIds(t);
-
-      checkInterrupt();
-
-      CompactionType type = checkForCompaction(ci, validWriteIds, sd, 
t.getParameters(), runAs);
-      if (type != null) {
-        ci.type = type;
-        ci.poolName = poolName;
-        requestCompaction(ci, runAs);
-      }
-    } catch (InterruptedException e) {
-      //Handle InterruptedException separately so the compactioninfo won't be 
marked as failed.
-      LOG.info("Initiator pool is being shut down, task received 
interruption.");
-    } catch (Throwable ex) {
-      String errorMessage = "Caught exception while trying to determine if we 
should compact " + ci + ". Marking "
-          + "failed to avoid repeated failures, " + ex;
-      LOG.error(errorMessage);
-      ci.errorMessage = errorMessage;
-      if (metricsEnabled) {
-        
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER).inc();
-      }
-      txnHandler.markFailed(ci);
-    }
-  }
-
   private String getPoolName(CompactionInfo ci, Table t) throws Exception {
     Map<String, String> params = t.getParameters();
     String poolName = params == null ? null : 
params.get(Constants.HIVE_COMPACTOR_WORKER_POOL);
@@ -294,16 +227,7 @@ public class Initiator extends MetaStoreCompactorThread {
     return CompactorUtil.resolveDatabase(conf, ci.dbname);
   }
 
-  private ValidWriteIdList resolveValidWriteIds(Table t) throws 
NoSuchTxnException, MetaException {
-    ValidTxnList validTxnList = new 
ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY));
-    // The response will have one entry per table and hence we get only one 
ValidWriteIdList
-    String fullTableName = TxnUtils.getFullTableName(t.getDbName(), 
t.getTableName());
-    GetValidWriteIdsRequest rqst = new 
GetValidWriteIdsRequest(Collections.singletonList(fullTableName));
-    rqst.setValidTxnList(validTxnList.writeToString());
 
-    return TxnUtils.createValidCompactWriteIdList(
-        txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0));
-  }
 
   @VisibleForTesting
   protected String resolveUserToRunAs(Map<String, String> cache, Table t, 
Partition p)
@@ -394,160 +318,6 @@ public class Initiator extends MetaStoreCompactorThread {
     return false;
   }
 
-  private CompactionType checkForCompaction(final CompactionInfo ci,
-                                            final ValidWriteIdList writeIds,
-                                            final StorageDescriptor sd,
-                                            final Map<String, String> 
tblproperties,
-                                            final String runAs)
-      throws IOException, InterruptedException {
-    // If it's marked as too many aborted, we already know we need to compact
-    if (ci.tooManyAborts) {
-      LOG.debug("Found too many aborted transactions for " + 
ci.getFullPartitionName() + ", " +
-          "initiating major compaction");
-      return CompactionType.MAJOR;
-    }
-
-    if (ci.hasOldAbort) {
-      HiveConf.ConfVars oldAbortedTimeoutProp =
-          HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
-      LOG.debug("Found an aborted transaction for " + ci.getFullPartitionName()
-          + " with age older than threshold " + oldAbortedTimeoutProp + ": " + 
conf
-          .getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. "
-          + "Initiating minor compaction.");
-      return CompactionType.MINOR;
-    }
-    AcidDirectory acidDirectory = getAcidDirectory(sd, writeIds);
-    long baseSize = getBaseSize(acidDirectory);
-    FileSystem fs = acidDirectory.getFs();
-    Map<Path, Long> deltaSizes = new HashMap<>();
-    for (AcidUtils.ParsedDelta delta : acidDirectory.getCurrentDirectories()) {
-      deltaSizes.put(delta.getPath(), getDirSize(fs, delta));
-    }
-    long deltaSize = deltaSizes.values().stream().reduce(0L, Long::sum);
-    AcidMetricService.updateMetricsFromInitiator(ci.dbname, ci.tableName, 
ci.partName, conf, txnHandler,
-        baseSize, deltaSizes, acidDirectory.getObsolete());
-
-    if (runJobAsSelf(runAs)) {
-      return determineCompactionType(ci, acidDirectory, tblproperties, 
baseSize, deltaSize);
-    } else {
-      LOG.info("Going to initiate as user " + runAs + " for " + 
ci.getFullPartitionName());
-      UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs,
-        UserGroupInformation.getLoginUser());
-      CompactionType compactionType;
-      try {
-        compactionType = ugi.doAs(
-            (PrivilegedExceptionAction<CompactionType>) () -> 
determineCompactionType(ci, acidDirectory, tblproperties, baseSize, deltaSize));
-      } finally {
-        try {
-          FileSystem.closeAllForUGI(ugi);
-        } catch (IOException exception) {
-          LOG.error("Could not clean up file-system handles for UGI: " + ugi + 
" for " +
-              ci.getFullPartitionName(), exception);
-        }
-      }
-      return compactionType;
-    }
-  }
-
-  private AcidDirectory getAcidDirectory(StorageDescriptor sd, 
ValidWriteIdList writeIds) throws IOException {
-    Path location = new Path(sd.getLocation());
-    FileSystem fs = location.getFileSystem(conf);
-    return AcidUtils.getAcidState(fs, location, conf, writeIds, 
Ref.from(false), false);
-  }
-
-  private CompactionType determineCompactionType(CompactionInfo ci, 
AcidDirectory dir, Map<String,
-      String> tblproperties, long baseSize, long deltaSize) {
-    boolean noBase = false;
-    List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
-    if (baseSize == 0 && deltaSize > 0) {
-      noBase = true;
-    } else {
-      String deltaPctProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX +
-          HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD);
-      float deltaPctThreshold = deltaPctProp == null ?
-          HiveConf.getFloatVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) :
-          Float.parseFloat(deltaPctProp);
-      boolean bigEnough =   (float)deltaSize/(float)baseSize > 
deltaPctThreshold;
-      boolean multiBase = dir.getObsolete().stream()
-              .anyMatch(path -> 
path.getName().startsWith(AcidUtils.BASE_PREFIX));
-
-      boolean initiateMajor =  bigEnough || (deltaSize == 0  && multiBase);
-      if (LOG.isDebugEnabled()) {
-        StringBuilder msg = new StringBuilder("delta size: ");
-        msg.append(deltaSize);
-        msg.append(" base size: ");
-        msg.append(baseSize);
-        msg.append(" multiBase ");
-        msg.append(multiBase);
-        msg.append(" deltaSize ");
-        msg.append(deltaSize);
-        msg.append(" threshold: ");
-        msg.append(deltaPctThreshold);
-        msg.append(" delta/base ratio > 
").append(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD.varname)
-            .append(": ");
-        msg.append(bigEnough);
-        msg.append(".");
-        if (!initiateMajor) {
-          msg.append("not");
-        }
-        msg.append(" initiating major compaction.");
-        LOG.debug(msg.toString());
-      }
-      if (initiateMajor) return CompactionType.MAJOR;
-    }
-
-    String deltaNumProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX +
-        HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
-    int deltaNumThreshold = deltaNumProp == null ?
-        HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) :
-        Integer.parseInt(deltaNumProp);
-    boolean enough = deltas.size() > deltaNumThreshold;
-    if (!enough) {
-      LOG.debug("Not enough deltas to initiate compaction for table=" + 
ci.tableName + "partition=" + ci.partName
-          + ". Found: " + deltas.size() + " deltas, threshold is " + 
deltaNumThreshold);
-      return null;
-    }
-    // If there's no base file, do a major compaction
-    LOG.debug("Found " + deltas.size() + " delta files, and " + (noBase ? "no" 
: "has") + " base," +
-        "requesting " + (noBase ? "major" : "minor") + " compaction");
-
-    return noBase || !isMinorCompactionSupported(tblproperties, dir) ?
-            CompactionType.MAJOR : CompactionType.MINOR;
-  }
-
-  private long getBaseSize(AcidDirectory dir) throws IOException {
-    long baseSize = 0;
-    if (dir.getBase() != null) {
-      baseSize = getDirSize(dir.getFs(), dir.getBase());
-    } else {
-      for (HdfsFileStatusWithId origStat : dir.getOriginalFiles()) {
-        baseSize += origStat.getFileStatus().getLen();
-      }
-    }
-    return baseSize;
-  }
-
-  private long getDirSize(FileSystem fs, ParsedDirectory dir) throws 
IOException {
-    return dir.getFiles(fs, Ref.from(false)).stream()
-        .map(HdfsFileStatusWithId::getFileStatus)
-        .mapToLong(FileStatus::getLen)
-        .sum();
-  }
-
-  private void requestCompaction(CompactionInfo ci, String runAs) throws 
MetaException {
-    CompactionRequest rqst = new CompactionRequest(ci.dbname, ci.tableName, 
ci.type);
-    if (ci.partName != null) rqst.setPartitionname(ci.partName);
-    rqst.setRunas(runAs);
-    rqst.setInitiatorId(getInitiatorId(Thread.currentThread().getId()));
-    rqst.setInitiatorVersion(this.runtimeVersion);
-    rqst.setPoolName(ci.poolName);
-    LOG.info("Requesting compaction: " + rqst);
-    CompactionResponse resp = txnHandler.compact(rqst);
-    if(resp.isAccepted()) {
-      ci.id = resp.getId();
-    }
-  }
-
   // Check if it's a dynamic partitioning case. If so, do not initiate 
compaction for streaming ingest, only for aborts.
   private static boolean isDynPartIngest(Table t, CompactionInfo ci){
     if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 &&
@@ -637,13 +407,6 @@ public class Initiator extends MetaStoreCompactorThread {
     return true;
   }
 
-  private String getInitiatorId(long threadId) {
-    StringBuilder name = new StringBuilder(this.hostName);
-    name.append("-");
-    name.append(threadId);
-    return name.toString();
-  }
-
   private static class InitiatorCycleUpdater implements Runnable {
     private final String metric;
     private final long startedAt;
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java
new file mode 100644
index 00000000000..14dd2ebffe0
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionResponse;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.common.util.Ref;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class InitiatorBase extends MetaStoreCompactorThread {
+
+  static final private String COMPACTOR_THRESHOLD_PREFIX = 
"compactorthreshold.";
+
+  private List<CompactionResponse> 
initiateCompactionForMultiplePartitions(Table table,
+      Map<String, Partition> partitions, CompactionRequest request) {
+    List<CompactionResponse> compactionResponses = new ArrayList<>();
+    partitions.entrySet().parallelStream().forEach(entry -> {
+      try {
+        StorageDescriptor sd = resolveStorageDescriptor(table, 
entry.getValue());
+        String runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf);
+        CompactionInfo ci =
+            new CompactionInfo(table.getDbName(), table.getTableName(), 
entry.getKey(), request.getType());
+        ci.initiatorId = request.getInitiatorId();
+        ci.orderByClause = request.getOrderByClause();
+        ci.initiatorVersion = request.getInitiatorVersion();
+        if (request.getNumberOfBuckets() > 0) {
+          ci.numberOfBuckets = request.getNumberOfBuckets();
+        }
+        ci.poolName = request.getPoolName();
+        LOG.info(
+            "Checking to see if we should compact partition " + entry.getKey() 
+ " of table " + table.getDbName() + "."
+                + table.getTableName());
+        CollectionUtils.addIgnoreNull(compactionResponses,
+            scheduleCompactionIfRequired(ci, table, entry.getValue(), runAs, 
false));
+      } catch (IOException | InterruptedException | MetaException e) {
+        LOG.error(
+            "Error occurred while Checking if we should compact partition " + 
entry.getKey() + " of table " + table.getDbName() + "."
+                + table.getTableName() + " Exception: " + e.getMessage());
+        throw new RuntimeException(e);
+      }
+    });
+    return compactionResponses;
+  }
+
+  public List<CompactionResponse> initiateCompactionForTable(CompactionRequest 
request, Table table, Map<String, Partition> partitions) throws Exception {
+    ValidTxnList validTxnList = 
TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0);
+    conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+
+    if (request.getPartitionname()!= null || partitions.isEmpty()) {
+      List<CompactionResponse> responses = new ArrayList<>();
+      responses.add(txnHandler.compact(request));
+      return responses;
+    } else {
+      return initiateCompactionForMultiplePartitions(table, partitions, 
request);
+    }
+  }
+
+  @Override protected boolean isCacheEnabled() {
+    return false;
+  }
+
+  private String getInitiatorId(long threadId) {
+    return this.hostName + "-" + threadId;
+  }
+
+  private CompactionResponse requestCompaction(CompactionInfo ci, String 
runAs) throws MetaException {
+    CompactionRequest compactionRequest = new CompactionRequest(ci.dbname, 
ci.tableName, ci.type);
+    if (ci.partName != null)
+      compactionRequest.setPartitionname(ci.partName);
+    compactionRequest.setRunas(runAs);
+    if (StringUtils.isEmpty(ci.initiatorId)) {
+      
compactionRequest.setInitiatorId(getInitiatorId(Thread.currentThread().getId()));
+    } else {
+      compactionRequest.setInitiatorId(ci.initiatorId);
+    }
+    compactionRequest.setInitiatorVersion(this.runtimeVersion);
+    compactionRequest.setPoolName(ci.poolName);
+    LOG.info("Requesting compaction: " + compactionRequest);
+    CompactionResponse resp = txnHandler.compact(compactionRequest);
+    if (resp.isAccepted()) {
+      ci.id = resp.getId();
+    }
+    return resp;
+  }
+
+  private AcidDirectory getAcidDirectory(StorageDescriptor sd, 
ValidWriteIdList writeIds) throws IOException {
+    Path location = new Path(sd.getLocation());
+    FileSystem fs = location.getFileSystem(conf);
+    return AcidUtils.getAcidState(fs, location, conf, writeIds, 
Ref.from(false), false);
+  }
+
+  private CompactionType determineCompactionType(CompactionInfo ci, 
AcidDirectory dir,
+      Map<String, String> tblProperties, long baseSize, long deltaSize) {
+    boolean noBase = false;
+    List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
+    if (baseSize == 0 && deltaSize > 0) {
+      noBase = true;
+    } else {
+      String deltaPctProp =
+          tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD);
+      float deltaPctThreshold = deltaPctProp == null ? 
HiveConf.getFloatVar(conf,
+          HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) : 
Float.parseFloat(deltaPctProp);
+      boolean bigEnough = (float) deltaSize / (float) baseSize > 
deltaPctThreshold;
+      boolean multiBase = dir.getObsolete().stream().anyMatch(path -> 
path.getName().startsWith(AcidUtils.BASE_PREFIX));
+
+      boolean initiateMajor = bigEnough || (deltaSize == 0 && multiBase);
+      if (LOG.isDebugEnabled()) {
+        StringBuilder msg = new StringBuilder("delta size: ");
+        msg.append(deltaSize);
+        msg.append(" base size: ");
+        msg.append(baseSize);
+        msg.append(" multiBase ");
+        msg.append(multiBase);
+        msg.append(" deltaSize ");
+        msg.append(deltaSize);
+        msg.append(" threshold: ");
+        msg.append(deltaPctThreshold);
+        msg.append(" delta/base ratio > 
").append(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD.varname)
+            .append(": ");
+        msg.append(bigEnough);
+        msg.append(".");
+        if (!initiateMajor) {
+          msg.append("not");
+        }
+        msg.append(" initiating major compaction.");
+        LOG.debug(msg.toString());
+      }
+      if (initiateMajor)
+        return CompactionType.MAJOR;
+    }
+
+    String deltaNumProp =
+        tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
+    int deltaNumThreshold = deltaNumProp == null ? HiveConf.getIntVar(conf,
+        HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) : 
Integer.parseInt(deltaNumProp);
+    boolean enough = deltas.size() > deltaNumThreshold;
+    if (!enough) {
+      LOG.debug("Not enough deltas to initiate compaction for table=" + 
ci.tableName + "partition=" + ci.partName
+          + ". Found: " + deltas.size() + " deltas, threshold is " + 
deltaNumThreshold);
+      return null;
+    }
+    // If there's no base file, do a major compaction
+    LOG.debug("Found " + deltas.size() + " delta files, and " + (noBase ? "no" 
: "has") + " base," + "requesting "
+        + (noBase ? "major" : "minor") + " compaction");
+
+    return noBase || !isMinorCompactionSupported(tblProperties, dir) ? 
CompactionType.MAJOR : CompactionType.MINOR;
+  }
+
+  private long getBaseSize(AcidDirectory dir) throws IOException {
+    long baseSize = 0;
+    if (dir.getBase() != null) {
+      baseSize = getDirSize(dir.getFs(), dir.getBase());
+    } else {
+      for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles()) 
{
+        baseSize += origStat.getFileStatus().getLen();
+      }
+    }
+    return baseSize;
+  }
+
+  private long getDirSize(FileSystem fs, AcidUtils.ParsedDirectory dir) throws 
IOException {
+    return dir.getFiles(fs, 
Ref.from(false)).stream().map(HadoopShims.HdfsFileStatusWithId::getFileStatus)
+        .mapToLong(FileStatus::getLen).sum();
+  }
+
+  private CompactionType checkForCompaction(final CompactionInfo ci, final 
ValidWriteIdList writeIds,
+      final StorageDescriptor sd, final Map<String, String> tblProperties, 
final String runAs)
+      throws IOException, InterruptedException {
+    // If it's marked as too many aborted, we already know we need to compact
+    if (ci.tooManyAborts) {
+      LOG.debug("Found too many aborted transactions for " + 
ci.getFullPartitionName() + ", "
+          + "initiating major compaction");
+      return CompactionType.MAJOR;
+    }
+
+    if (ci.hasOldAbort) {
+      HiveConf.ConfVars oldAbortedTimeoutProp = 
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
+      LOG.debug("Found an aborted transaction for " + 
ci.getFullPartitionName() + " with age older than threshold "
+          + oldAbortedTimeoutProp + ": " + 
conf.getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. "
+          + "Initiating minor compaction.");
+      return CompactionType.MINOR;
+    }
+    AcidDirectory acidDirectory = getAcidDirectory(sd, writeIds);
+    long baseSize = getBaseSize(acidDirectory);
+    FileSystem fs = acidDirectory.getFs();
+    Map<Path, Long> deltaSizes = new HashMap<>();
+    for (AcidUtils.ParsedDelta delta : acidDirectory.getCurrentDirectories()) {
+      deltaSizes.put(delta.getPath(), getDirSize(fs, delta));
+    }
+    long deltaSize = deltaSizes.values().stream().reduce(0L, Long::sum);
+    AcidMetricService.updateMetricsFromInitiator(ci.dbname, ci.tableName, 
ci.partName, conf, txnHandler, baseSize,
+        deltaSizes, acidDirectory.getObsolete());
+
+    if (runJobAsSelf(runAs)) {
+      return determineCompactionType(ci, acidDirectory, tblProperties, 
baseSize, deltaSize);
+    } else {
+      LOG.info("Going to initiate as user " + runAs + " for " + 
ci.getFullPartitionName());
+      UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, 
UserGroupInformation.getLoginUser());
+      CompactionType compactionType;
+      try {
+        compactionType = ugi.doAs(
+            (PrivilegedExceptionAction<CompactionType>) () -> 
determineCompactionType(ci, acidDirectory, tblProperties,
+                baseSize, deltaSize));
+      } finally {
+        try {
+          FileSystem.closeAllForUGI(ugi);
+        } catch (IOException exception) {
+          LOG.error("Could not clean up file-system handles for UGI: " + ugi + 
" for " + ci.getFullPartitionName(),
+              exception);
+        }
+      }
+      return compactionType;
+    }
+  }
+
+  private ValidWriteIdList resolveValidWriteIds(Table t)
+      throws NoSuchTxnException, MetaException {
+    ValidTxnList validTxnList = new 
ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY));
+    // The response will have one entry per table and hence we get only one 
ValidWriteIdList
+    String fullTableName = TxnUtils.getFullTableName(t.getDbName(), 
t.getTableName());
+    GetValidWriteIdsRequest validWriteIdsRequest = new 
GetValidWriteIdsRequest(Collections.singletonList(fullTableName));
+    validWriteIdsRequest.setValidTxnList(validTxnList.writeToString());
+
+    return 
TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(validWriteIdsRequest).getTblValidWriteIds().get(0));
+  }
+
+  protected CompactionResponse scheduleCompactionIfRequired(CompactionInfo ci, 
Table t,
+      Partition p, String runAs, boolean metricsEnabled)
+      throws MetaException {
+    StorageDescriptor sd = resolveStorageDescriptor(t, p);
+    try {
+      ValidWriteIdList validWriteIds = resolveValidWriteIds(t);
+
+      checkInterrupt();
+
+      CompactionType type = checkForCompaction(ci, validWriteIds, sd, 
t.getParameters(), runAs);
+      if (type != null) {
+        ci.type = type;
+        return requestCompaction(ci, runAs);
+      }
+    } catch (InterruptedException e) {
+      //Handle InterruptedException separately so the compactionInfo won't be 
marked as failed.
+      LOG.info("Initiator pool is being shut down, task received 
interruption.");
+    } catch (Throwable ex) {
+      String errorMessage = "Caught exception while trying to determine if we 
should compact " + ci + ". Marking "
+          + "failed to avoid repeated failures, " + ex;
+      LOG.error(errorMessage);
+      ci.errorMessage = errorMessage;
+      if (metricsEnabled) {
+        
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER).inc();
+      }
+      txnHandler.markFailed(ci);
+    }
+    return null;
+  }
+
+}
diff --git a/ql/src/test/queries/clientpositive/manual_compaction.q 
b/ql/src/test/queries/clientpositive/manual_compaction.q
new file mode 100644
index 00000000000..a794ff7555a
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/manual_compaction.q
@@ -0,0 +1,68 @@
+-- Mask the enqueue time which is based on current time
+--! qt:replace:/(initiated\s+---\s+---\s+)[0-9]*(\s+---)/$1#Masked#$2/
+--! qt:replace:/(---\s+)[a-zA-Z0-9\-]+(\s+manual)/$1#Masked#$2/
+-- Mask the hostname in show compaction
+--! qt:replace:/(---\s+)[\S]*(\s+manual)/$1#Masked#$2/
+-- Mask compaction id as they will be allocated in parallel threads
+--! qt:replace:/^[0-9]/#Masked#/
+
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+create table UN_PARTITIONED_T(key string, val string) clustered by (val) into 
2 buckets stored as ORC TBLPROPERTIES ('transactional'='true');
+
+create table UN_PARTITIONED_T_MINOR(key string, val string) clustered by (val) 
into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true');
+
+create table PARTITIONED_T(key string, val string) partitioned by (dt string) 
clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES 
('transactional'='true');
+
+alter table UN_PARTITIONED_T compact 'major';
+
+alter table UN_PARTITIONED_T_MINOR compact 'minor';
+
+alter table PARTITIONED_T add partition(dt='2023');
+
+insert into PARTITIONED_T partition(dt='2023') values ('k1','v1');
+insert into PARTITIONED_T partition(dt='2023') values ('k2','v2');
+insert into PARTITIONED_T partition(dt='2023') values ('k3','v3');
+
+alter table PARTITIONED_T partition(dt='2023') compact 'minor';
+
+SHOW COMPACTIONS ORDER BY 'PARTITION' DESC;
+
+alter table PARTITIONED_T add partition(dt='2024');
+
+insert into PARTITIONED_T partition(dt='2024') values ('k1','v1');
+insert into PARTITIONED_T partition(dt='2024') values ('k2','v2');
+insert into PARTITIONED_T partition(dt='2024') values ('k3','v3');
+insert into PARTITIONED_T partition(dt='2024') values ('k4','v4');
+insert into PARTITIONED_T partition(dt='2024') values ('k5','v5');
+insert into PARTITIONED_T partition(dt='2024') values ('k6','v6');
+insert into PARTITIONED_T partition(dt='2024') values ('k7','v7');
+insert into PARTITIONED_T partition(dt='2024') values ('k8','v8');
+insert into PARTITIONED_T partition(dt='2024') values ('k9','v9');
+insert into PARTITIONED_T partition(dt='2024') values ('k10','v10');
+insert into PARTITIONED_T partition(dt='2024') values ('k11','v11');
+
+insert into PARTITIONED_T partition(dt='2022') values ('k1','v1');
+insert into PARTITIONED_T partition(dt='2022') values ('k2','v2');
+insert into PARTITIONED_T partition(dt='2022') values ('k3','v3');
+insert into PARTITIONED_T partition(dt='2022') values ('k4','v4');
+insert into PARTITIONED_T partition(dt='2022') values ('k5','v5');
+insert into PARTITIONED_T partition(dt='2022') values ('k6','v6');
+insert into PARTITIONED_T partition(dt='2022') values ('k7','v7');
+insert into PARTITIONED_T partition(dt='2022') values ('k8','v8');
+insert into PARTITIONED_T partition(dt='2022') values ('k9','v9');
+insert into PARTITIONED_T partition(dt='2022') values ('k10','v10');
+insert into PARTITIONED_T partition(dt='2022') values ('k11','v11');
+
+explain alter table PARTITIONED_T compact 'major';
+
+alter table PARTITIONED_T compact 'major';
+
+SHOW COMPACTIONS ORDER BY 'PARTITION' DESC;
+
+drop table UN_PARTITIONED_T;
+
+drop table UN_PARTITIONED_T_MINOR;
+
+drop table PARTITIONED_T;
diff --git a/ql/src/test/results/clientpositive/llap/manual_compaction.q.out 
b/ql/src/test/results/clientpositive/llap/manual_compaction.q.out
new file mode 100644
index 00000000000..90dddd709e1
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/manual_compaction.q.out
@@ -0,0 +1,387 @@
+PREHOOK: query: create table UN_PARTITIONED_T(key string, val string) 
clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES 
('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@UN_PARTITIONED_T
+POSTHOOK: query: create table UN_PARTITIONED_T(key string, val string) 
clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES 
('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@UN_PARTITIONED_T
+PREHOOK: query: create table UN_PARTITIONED_T_MINOR(key string, val string) 
clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES 
('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@UN_PARTITIONED_T_MINOR
+POSTHOOK: query: create table UN_PARTITIONED_T_MINOR(key string, val string) 
clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES 
('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@UN_PARTITIONED_T_MINOR
+PREHOOK: query: create table PARTITIONED_T(key string, val string) partitioned 
by (dt string) clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES 
('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@PARTITIONED_T
+POSTHOOK: query: create table PARTITIONED_T(key string, val string) 
partitioned by (dt string) clustered by (val) into 2 buckets stored as ORC 
TBLPROPERTIES ('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@PARTITIONED_T
+PREHOOK: query: alter table UN_PARTITIONED_T compact 'major'
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@un_partitioned_t
+PREHOOK: Output: default@un_partitioned_t
+POSTHOOK: query: alter table UN_PARTITIONED_T compact 'major'
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@un_partitioned_t
+POSTHOOK: Output: default@un_partitioned_t
+PREHOOK: query: alter table UN_PARTITIONED_T_MINOR compact 'minor'
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@un_partitioned_t_minor
+PREHOOK: Output: default@un_partitioned_t_minor
+POSTHOOK: query: alter table UN_PARTITIONED_T_MINOR compact 'minor'
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@un_partitioned_t_minor
+POSTHOOK: Output: default@un_partitioned_t_minor
+PREHOOK: query: alter table PARTITIONED_T add partition(dt='2023')
+PREHOOK: type: ALTERTABLE_ADDPARTS
+PREHOOK: Output: default@partitioned_t
+POSTHOOK: query: alter table PARTITIONED_T add partition(dt='2023')
+POSTHOOK: type: ALTERTABLE_ADDPARTS
+POSTHOOK: Output: default@partitioned_t
+POSTHOOK: Output: default@partitioned_t@dt=2023
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2023') values 
('k1','v1')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2023
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2023') values 
('k1','v1')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2023
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2023') values 
('k2','v2')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2023
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2023') values 
('k2','v2')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2023
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2023') values 
('k3','v3')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2023
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2023') values 
('k3','v3')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2023
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).val SCRIPT []
+PREHOOK: query: alter table PARTITIONED_T partition(dt='2023') compact 'minor'
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@partitioned_t
+PREHOOK: Output: default@partitioned_t@dt=2023
+POSTHOOK: query: alter table PARTITIONED_T partition(dt='2023') compact 'minor'
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@partitioned_t
+POSTHOOK: Output: default@partitioned_t@dt=2023
+PREHOOK: query: SHOW COMPACTIONS ORDER BY 'PARTITION' DESC
+PREHOOK: type: SHOW COMPACTIONS
+POSTHOOK: query: SHOW COMPACTIONS ORDER BY 'PARTITION' DESC
+POSTHOOK: type: SHOW COMPACTIONS
+CompactionId   Database        Table   Partition       Type    State   Worker 
host     Worker  Enqueue Time    Start Time      Duration(ms)    HadoopJobId    
 Error message   Initiator host  Initiator       Pool name       TxnId   Next 
TxnId      Commit Time     Highest WriteId
+#Masked#       default un_partitioned_t_minor   ---    MINOR   initiated       
 ---     ---    #Masked#         ---     ---     ---     ---    #Masked#        
manual  default 0       0       0        --- 
+#Masked#       default un_partitioned_t         ---    MAJOR   initiated       
 ---     ---    #Masked#         ---     ---     ---     ---    #Masked#        
manual  default 0       0       0        --- 
+#Masked#       default partitioned_t   dt=2023 MINOR   initiated        ---    
 ---    #Masked#         ---     ---     ---     ---    #Masked#        manual  
default 0       0       0        --- 
+PREHOOK: query: alter table PARTITIONED_T add partition(dt='2024')
+PREHOOK: type: ALTERTABLE_ADDPARTS
+PREHOOK: Output: default@partitioned_t
+POSTHOOK: query: alter table PARTITIONED_T add partition(dt='2024')
+POSTHOOK: type: ALTERTABLE_ADDPARTS
+POSTHOOK: Output: default@partitioned_t
+POSTHOOK: Output: default@partitioned_t@dt=2024
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k1','v1')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k1','v1')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k2','v2')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k2','v2')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k3','v3')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k3','v3')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k4','v4')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k4','v4')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k5','v5')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k5','v5')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k6','v6')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k6','v6')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k7','v7')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k7','v7')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k8','v8')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k8','v8')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k9','v9')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k9','v9')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k10','v10')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k10','v10')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k11','v11')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values 
('k11','v11')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k1','v1')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k1','v1')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k2','v2')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k2','v2')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k3','v3')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k3','v3')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k4','v4')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k4','v4')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k5','v5')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k5','v5')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k6','v6')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k6','v6')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k7','v7')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k7','v7')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k8','v8')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k8','v8')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k9','v9')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k9','v9')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k10','v10')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k10','v10')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k11','v11')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values 
('k11','v11')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: explain alter table PARTITIONED_T compact 'major'
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@partitioned_t
+PREHOOK: Output: default@partitioned_t
+POSTHOOK: query: explain alter table PARTITIONED_T compact 'major'
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@partitioned_t
+POSTHOOK: Output: default@partitioned_t
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Compact
+      compaction type: major
+      table name: default.PARTITIONED_T
+      numberOfBuckets: 0
+      table name: default.PARTITIONED_T
+
+PREHOOK: query: alter table PARTITIONED_T compact 'major'
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@partitioned_t
+PREHOOK: Output: default@partitioned_t
+POSTHOOK: query: alter table PARTITIONED_T compact 'major'
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@partitioned_t
+POSTHOOK: Output: default@partitioned_t
+PREHOOK: query: SHOW COMPACTIONS ORDER BY 'PARTITION' DESC
+PREHOOK: type: SHOW COMPACTIONS
+POSTHOOK: query: SHOW COMPACTIONS ORDER BY 'PARTITION' DESC
+POSTHOOK: type: SHOW COMPACTIONS
+CompactionId   Database        Table   Partition       Type    State   Worker 
host     Worker  Enqueue Time    Start Time      Duration(ms)    HadoopJobId    
 Error message   Initiator host  Initiator       Pool name       TxnId   Next 
TxnId      Commit Time     Highest WriteId
+#Masked#       default un_partitioned_t_minor   ---    MINOR   initiated       
 ---     ---    #Masked#         ---     ---     ---     ---    #Masked#        
manual  default 0       0       0        --- 
+#Masked#       default un_partitioned_t         ---    MAJOR   initiated       
 ---     ---    #Masked#         ---     ---     ---     ---    #Masked#        
manual  default 0       0       0        --- 
+#Masked#       default partitioned_t   dt=2024 MAJOR   initiated        ---    
 ---    #Masked#         ---     ---     ---     ---    #Masked#        manual  
default 0       0       0        --- 
+#Masked#       default partitioned_t   dt=2023 MINOR   initiated        ---    
 ---    #Masked#         ---     ---     ---     ---    #Masked#        manual  
default 0       0       0        --- 
+#Masked#       default partitioned_t   dt=2022 MAJOR   initiated        ---    
 ---    #Masked#         ---     ---     ---     ---    #Masked#        manual  
default 0       0       0        --- 
+PREHOOK: query: drop table UN_PARTITIONED_T
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@un_partitioned_t
+PREHOOK: Output: database:default
+PREHOOK: Output: default@un_partitioned_t
+POSTHOOK: query: drop table UN_PARTITIONED_T
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@un_partitioned_t
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@un_partitioned_t
+PREHOOK: query: drop table UN_PARTITIONED_T_MINOR
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@un_partitioned_t_minor
+PREHOOK: Output: database:default
+PREHOOK: Output: default@un_partitioned_t_minor
+POSTHOOK: query: drop table UN_PARTITIONED_T_MINOR
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@un_partitioned_t_minor
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@un_partitioned_t_minor
+PREHOOK: query: drop table PARTITIONED_T
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@partitioned_t
+PREHOOK: Output: database:default
+PREHOOK: Output: default@partitioned_t
+POSTHOOK: query: drop table PARTITIONED_T
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@partitioned_t
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@partitioned_t

Reply via email to