This is an automated email from the ASF dual-hosted git repository.

zabetak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git

commit df45194268ffb10f9f7aae0ab4e3aec35a7b31d8
Author: Taraka Rama Rao Lethavadla <tarakaramarao.lethava...@gmail.com>
AuthorDate: Tue Nov 7 19:06:27 2023 +0530

    HIVE-27848: Refactor Initiator hierarchy into CompactorUtil and fix failure 
in TestCrudCompactorOnTez (Taraka Rama Rao Lethavadla reviewed by Stamatis 
Zampetakis)
    
    The work started initially to fix the 
TestCrudCompactorOnTez.secondCompactionShouldBeRefusedBeforeEnqueueing.
    
    However, while changing the code to address the failure, the inheritance 
based design for the Initator that was chosen in HIVE-27598 revealed some 
weaknesses briefly outlined below.
    
    Due to inheritance the InitiatorBase class becomes a Thread something that 
doesn't really make sense and it comes with additional overhead every time we 
instantiate it. Moreover, the only class that currently extends InitiatorBase 
is the Initiator and it's difficult to imagine how we can make other extensions 
from InitiatorBase; the code becomes complex and any subtle change in 
InitiatorBase may have unpredictable effects on Initiator. Having a "Base" 
class that is not really meant to [...]
    
    For the reasons above the focus of the work changed from just re-enabling 
the test to improving and addressing the shortcomings of the inheritance based 
design of Initiator.
    
    Close apache/hive#4859
---
 .../ql/txn/compactor/TestCrudCompactorOnTez.java   |  48 +++-
 .../compact/AlterTableCompactOperation.java        |  83 +++---
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    |   7 +
 .../hive/ql/txn/compactor/CompactorUtil.java       | 250 +++++++++++++++++
 .../hadoop/hive/ql/txn/compactor/Initiator.java    |   6 +-
 .../hive/ql/txn/compactor/InitiatorBase.java       | 307 ---------------------
 6 files changed, 357 insertions(+), 344 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index f64fac9038d..26302e6c143 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -77,7 +77,6 @@ import org.apache.orc.StripeInformation;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.impl.RecordReaderImpl;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
@@ -602,7 +601,6 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
   }
 
   @Test
-  @Ignore("HIVE-27848")
   public void secondCompactionShouldBeRefusedBeforeEnqueueing() throws 
Exception {
     conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
 
@@ -648,6 +646,52 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
     Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
   }
 
+  @Test
+  public void secondCompactionShouldBeRefusedBeforeEnqueueingForPartition() 
throws Exception {
+    conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+
+    final String dbName = "default";
+    final String tableName = "compaction_test";
+    executeStatementOnDriver("drop table if exists " + tableName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tableName + "(id string, value 
string) partitioned by(pt string) CLUSTERED BY(id) "
+        + "INTO 10 BUCKETS STORED AS ORC 
TBLPROPERTIES('transactional'='true')", driver);
+    executeStatementOnDriver("alter table " + tableName + " add 
partition(pt='test')",driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " 
partition(pt='test') values ('1','one'),('2','two'),('3','three'),"
+        + 
"('4','four'),('5','five'),('6','six'),('7','seven'),('8','eight'),('9','nine'),('10','ten'),"
+        + 
"('11','eleven'),('12','twelve'),('13','thirteen'),('14','fourteen'),('15','fifteen'),('16','sixteen'),"
+        + 
"('17','seventeen'),('18','eighteen'),('19','nineteen'),('20','twenty')", 
driver);
+
+    executeStatementOnDriver("insert into " + tableName + " 
partition(pt='test') values ('21', 'value21'),('84', 'value84'),"
+        + "('66', 'value66'),('54', 'value54')", driver);
+    executeStatementOnDriver(
+        "insert into " + tableName + " partition(pt='test') values ('22', 
'value22'),('34', 'value34')," + "('35', 'value35')", driver);
+    executeStatementOnDriver("insert into " + tableName + " 
partition(pt='test') values ('75', 'value75'),('99', 'value99')", driver);
+
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+
+    //Do a compaction directly and wait for it to finish
+    CompactionRequest rqst = new CompactionRequest(dbName, tableName, 
CompactionType.MAJOR);
+    rqst.setPartitionname("pt=test");
+    CompactionResponse resp = txnHandler.compact(rqst);
+    runWorker(conf);
+
+    //Try to do a second compaction on the same table before the cleaner runs.
+    try {
+      driver.run("ALTER TABLE " + tableName + " partition(pt='test') COMPACT 
'major'");
+    } catch (CommandProcessorException e) {
+      String errorMessage = ErrorMsg.COMPACTION_REFUSED.format(dbName, 
tableName, " partition(pt=test)",
+          "Compaction is already scheduled with state='ready for cleaning' and 
id=" + resp.getId());
+      Assert.assertEquals(errorMessage, e.getCauseMessage());
+      Assert.assertEquals(ErrorMsg.COMPACTION_REFUSED.getErrorCode(), 
e.getErrorCode());
+    }
+
+    //Check if the first compaction is in 'ready for cleaning'
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+  }
+
   @Test
   public void testMinorCompactionShouldBeRefusedOnTablesWithOriginalFiles() 
throws Exception {
     conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
index 8e86056053c..19f4b48d8c6 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.ddl.table.storage.compact;
 
+import org.apache.hadoop.hive.common.ServerUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 
@@ -27,6 +28,7 @@ import 
org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.JavaUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.ddl.DDLOperation;
@@ -35,10 +37,13 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.txn.compactor.InitiatorBase;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
 
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.Optional;
 
 import static 
org.apache.hadoop.hive.ql.io.AcidUtils.compactionTypeStr2ThriftType;
 
@@ -46,6 +51,7 @@ import static 
org.apache.hadoop.hive.ql.io.AcidUtils.compactionTypeStr2ThriftTyp
  * Operation process of compacting a table.
  */
 public class AlterTableCompactOperation extends 
DDLOperation<AlterTableCompactDesc> {
+
   public AlterTableCompactOperation(DDLOperationContext context, 
AlterTableCompactDesc desc) {
     super(context, desc);
   }
@@ -56,6 +62,11 @@ public class AlterTableCompactOperation extends 
DDLOperation<AlterTableCompactDe
       throw new HiveException(ErrorMsg.NONACID_COMPACTION_NOT_SUPPORTED, 
table.getDbName(), table.getTableName());
     }
 
+    Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap =
+        convertPartitionsFromThriftToDB(getPartitions(table, desc, context));
+
+    TxnStore txnHandler = TxnUtils.getTxnStore(context.getConf());
+
     CompactionRequest compactionRequest = new 
CompactionRequest(table.getDbName(), table.getTableName(),
         compactionTypeStr2ThriftType(desc.getCompactionType()));
 
@@ -69,40 +80,48 @@ public class AlterTableCompactOperation extends 
DDLOperation<AlterTableCompactDe
       compactionRequest.setNumberOfBuckets(desc.getNumberOfBuckets());
     }
 
-    InitiatorBase initiatorBase = new InitiatorBase();
-    initiatorBase.setConf(context.getConf());
-    initiatorBase.init(new AtomicBoolean());
-
-    Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap =
-        convertPartitionsFromThriftToDB(getPartitions(table, desc, context));
-
-    if(desc.getPartitionSpec() != null){
-      Optional<String> partitionName =  
partitionMap.keySet().stream().findFirst();
-      partitionName.ifPresent(compactionRequest::setPartitionname);
-    }
-    List<CompactionResponse> compactionResponses =
-        initiatorBase.initiateCompactionForTable(compactionRequest, 
table.getTTable(), partitionMap);
-    for (CompactionResponse compactionResponse : compactionResponses) {
-      if (!compactionResponse.isAccepted()) {
-        String message;
-        if (compactionResponse.isSetErrormessage()) {
-          message = compactionResponse.getErrormessage();
-          throw new HiveException(ErrorMsg.COMPACTION_REFUSED, 
table.getDbName(), table.getTableName(),
-              "CompactionId: " + compactionResponse.getId(), message);
-        }
-        context.getConsole().printInfo(
-            "Compaction already enqueued with id " + 
compactionResponse.getId() + "; State is "
-                + compactionResponse.getState());
-        continue;
+    //Will directly initiate compaction if an un-partitioned table/a partition 
is specified in the request
+    if (desc.getPartitionSpec() != null || !table.isPartitioned()) {
+      if (desc.getPartitionSpec() != null) {
+        Optional<String> partitionName = 
partitionMap.keySet().stream().findFirst();
+        partitionName.ifPresent(compactionRequest::setPartitionname);
       }
-      context.getConsole().printInfo("Compaction enqueued with id " + 
compactionResponse.getId());
-      if (desc.isBlocking() && compactionResponse.isAccepted()) {
-        waitForCompactionToFinish(compactionResponse, context);
+      CompactionResponse compactionResponse = 
txnHandler.compact(compactionRequest);
+      parseCompactionResponse(compactionResponse, table, 
compactionRequest.getPartitionname());
+    } else { // Check for eligible partitions and initiate compaction
+      for (Map.Entry<String, org.apache.hadoop.hive.metastore.api.Partition> 
partitionMapEntry : partitionMap.entrySet()) {
+        compactionRequest.setPartitionname(partitionMapEntry.getKey());
+        CompactionResponse compactionResponse =
+            CompactorUtil.initiateCompactionForPartition(table.getTTable(), 
partitionMapEntry.getValue(),
+                compactionRequest, ServerUtils.hostname(), txnHandler, 
context.getConf());
+        parseCompactionResponse(compactionResponse, table, 
partitionMapEntry.getKey());
       }
     }
     return 0;
   }
 
+  private void parseCompactionResponse(CompactionResponse compactionResponse, 
Table table, String partitionName)
+      throws HiveException {
+    if (compactionResponse == null) {
+      context.getConsole().printInfo(
+          "Not enough deltas to initiate compaction for table=" + 
table.getTableName() + "partition=" + partitionName);
+      return;
+    }
+    if (!compactionResponse.isAccepted()) {
+      if (compactionResponse.isSetErrormessage()) {
+        throw new HiveException(ErrorMsg.COMPACTION_REFUSED, 
table.getDbName(), table.getTableName(),
+            partitionName == null ? "" : " partition(" + partitionName + ")", 
compactionResponse.getErrormessage());
+      }
+      context.getConsole().printInfo(
+          "Compaction already enqueued with id " + compactionResponse.getId() 
+ "; State is " + compactionResponse.getState());
+      return;
+    }
+    context.getConsole().printInfo("Compaction enqueued with id " + 
compactionResponse.getId());
+    if (desc.isBlocking() && compactionResponse.isAccepted()) {
+      waitForCompactionToFinish(compactionResponse, context);
+    }
+  }
+
   private List<Partition> getPartitions(Table table, AlterTableCompactDesc 
desc, DDLOperationContext context)
       throws HiveException {
     List<Partition> partitions = new ArrayList<>();
@@ -117,7 +136,7 @@ public class AlterTableCompactOperation extends 
DDLOperation<AlterTableCompactDe
       partitions = context.getDb().getPartitions(table, partitionSpec);
       if (partitions.size() > 1) {
         throw new HiveException(ErrorMsg.TOO_MANY_COMPACTION_PARTITIONS);
-      } else if (partitions.size() == 0) {
+      } else if (partitions.isEmpty()) {
         throw new HiveException(ErrorMsg.INVALID_PARTITION_SPEC);
       }
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 4c63a74d853..7b7aac818e9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1417,6 +1417,13 @@ public class AcidUtils {
     return directory;
   }
 
+  public static AcidDirectory getAcidState(StorageDescriptor sd, 
ValidWriteIdList writeIds, HiveConf conf)
+      throws IOException {
+    Path location = new Path(sd.getLocation());
+    FileSystem fs = location.getFileSystem(conf);
+    return getAcidState(fs, location, conf, writeIds, Ref.from(false), false);
+  }
+
   private static void findBestWorkingDeltas(ValidWriteIdList writeIdList, 
AcidDirectory directory) {
     Collections.sort(directory.getCurrentDirectories());
     //so now, 'current directories' should be sorted like delta_5_20 
delta_5_10 delta_11_20 delta_51_60 for example
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
index c8d86c7ea27..70c550375a2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
@@ -17,38 +17,64 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.LockComponentBuilder;
 import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.api.CompactionResponse;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
 import org.apache.hadoop.hive.metastore.api.LockRequest;
 import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+
+import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.utils.StringableMap;
 import org.apache.hadoop.hive.ql.io.AcidDirectory;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.common.util.Ref;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.HashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -59,6 +85,8 @@ import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCa
 public class CompactorUtil {
   private static final Logger LOG = 
LoggerFactory.getLogger(CompactorUtil.class);
   public static final String COMPACTOR = "compactor";
+  private static final String COMPACTOR_THRESHOLD_PREFIX = 
"compactorthreshold.";
+
   /**
    * List of accepted properties for defining the compactor's job queue.
    *
@@ -295,4 +323,226 @@ public class CompactorUtil {
         !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
     return requestBuilder.build();
   }
+
+  private static CompactionResponse requestCompaction(CompactionInfo ci, 
String runAs, String hostname,
+      TxnStore txnHandler) throws MetaException {
+    CompactionRequest compactionRequest = new CompactionRequest(ci.dbname, 
ci.tableName, ci.type);
+    if (ci.partName != null)
+      compactionRequest.setPartitionname(ci.partName);
+    compactionRequest.setRunas(runAs);
+    if (StringUtils.isEmpty(ci.initiatorId)) {
+      compactionRequest.setInitiatorId(hostname + "-" + 
Thread.currentThread().getId());
+    } else {
+      compactionRequest.setInitiatorId(ci.initiatorId);
+    }
+    compactionRequest.setInitiatorVersion(ci.initiatorVersion);
+    compactionRequest.setPoolName(ci.poolName);
+    LOG.info("Requesting compaction: " + compactionRequest);
+    CompactionResponse resp = txnHandler.compact(compactionRequest);
+    if (resp.isAccepted()) {
+      ci.id = resp.getId();
+    }
+    return resp;
+  }
+
+  private static CompactionType determineCompactionType(CompactionInfo ci, 
AcidDirectory dir,
+      Map<String, String> tblProperties, long baseSize, long deltaSize, 
HiveConf conf) {
+    boolean noBase = false;
+    List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
+    if (baseSize == 0 && deltaSize > 0) {
+      noBase = true;
+    } else {
+      String deltaPctProp =
+          tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD);
+      float deltaPctThreshold = deltaPctProp == null ? 
HiveConf.getFloatVar(conf,
+          HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) : 
Float.parseFloat(deltaPctProp);
+      boolean bigEnough = (float) deltaSize / (float) baseSize > 
deltaPctThreshold;
+      boolean multiBase = dir.getObsolete().stream().anyMatch(path -> 
path.getName().startsWith(AcidUtils.BASE_PREFIX));
+
+      boolean initiateMajor = bigEnough || (deltaSize == 0 && multiBase);
+      if (LOG.isDebugEnabled()) {
+        StringBuilder msg = new StringBuilder("delta size: ");
+        msg.append(deltaSize);
+        msg.append(" base size: ");
+        msg.append(baseSize);
+        msg.append(" multiBase ");
+        msg.append(multiBase);
+        msg.append(" deltaSize ");
+        msg.append(deltaSize);
+        msg.append(" threshold: ");
+        msg.append(deltaPctThreshold);
+        msg.append(" delta/base ratio > 
").append(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD.varname)
+            .append(": ");
+        msg.append(bigEnough);
+        msg.append(".");
+        if (!initiateMajor) {
+          msg.append("not");
+        }
+        msg.append(" initiating major compaction.");
+        LOG.debug(msg.toString());
+      }
+      if (initiateMajor)
+        return CompactionType.MAJOR;
+    }
+
+    String deltaNumProp =
+        tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
+    int deltaNumThreshold = deltaNumProp == null ? HiveConf.getIntVar(conf,
+        HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) : 
Integer.parseInt(deltaNumProp);
+    boolean enough = deltas.size() > deltaNumThreshold;
+    if (!enough) {
+      LOG.debug(
+          "Not enough deltas to initiate compaction for table=" + ci.tableName 
+ "partition=" + ci.partName
+              + ". Found: " + deltas.size() + " deltas, threshold is " + 
deltaNumThreshold);
+      return null;
+    }
+    // If there's no base file, do a major compaction
+    LOG.debug("Found " + deltas.size() + " delta files, and " + (noBase ? "no" 
: "has") + " base," + "requesting "
+        + (noBase ? "major" : "minor") + " compaction");
+
+    return noBase || !isMinorCompactionSupported(conf, tblProperties,
+        dir) ? CompactionType.MAJOR : CompactionType.MINOR;
+  }
+
+  private static long getBaseSize(AcidDirectory dir) throws IOException {
+    long baseSize = 0;
+    if (dir.getBase() != null) {
+      baseSize = getDirSize(dir.getFs(), dir.getBase());
+    } else {
+      for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles()) 
{
+        baseSize += origStat.getFileStatus().getLen();
+      }
+    }
+    return baseSize;
+  }
+
+  private static long getDirSize(FileSystem fs, AcidUtils.ParsedDirectory dir) 
throws IOException {
+    return dir.getFiles(fs, 
Ref.from(false)).stream().map(HadoopShims.HdfsFileStatusWithId::getFileStatus)
+        .mapToLong(FileStatus::getLen).sum();
+  }
+
+  private static CompactionType checkForCompaction(final CompactionInfo ci, 
final ValidWriteIdList writeIds,
+      final StorageDescriptor sd, final Map<String, String> tblProperties, 
final String runAs, TxnStore txnHandler,
+      HiveConf conf) throws IOException, InterruptedException {
+    // If it's marked as too many aborted, we already know we need to compact
+    if (ci.tooManyAborts) {
+      LOG.debug("Found too many aborted transactions for "
+          + ci.getFullPartitionName() + ", " + "initiating major compaction");
+      return CompactionType.MAJOR;
+    }
+
+    if (ci.hasOldAbort) {
+      HiveConf.ConfVars oldAbortedTimeoutProp = 
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
+      LOG.debug(
+          "Found an aborted transaction for " + ci.getFullPartitionName() + " 
with age older than threshold "
+              + oldAbortedTimeoutProp + ": " + 
conf.getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. "
+              + "Initiating minor compaction.");
+      return CompactionType.MINOR;
+    }
+    AcidDirectory acidDirectory = AcidUtils.getAcidState(sd, writeIds, conf);
+    long baseSize = getBaseSize(acidDirectory);
+    FileSystem fs = acidDirectory.getFs();
+    Map<Path, Long> deltaSizes = new HashMap<>();
+    for (AcidUtils.ParsedDelta delta : acidDirectory.getCurrentDirectories()) {
+      deltaSizes.put(delta.getPath(), getDirSize(fs, delta));
+    }
+    long deltaSize = deltaSizes.values().stream().reduce(0L, Long::sum);
+    AcidMetricService.updateMetricsFromInitiator(ci.dbname, ci.tableName, 
ci.partName, conf, txnHandler, baseSize,
+        deltaSizes, acidDirectory.getObsolete());
+
+    if (runJobAsSelf(runAs)) {
+      return determineCompactionType(ci, acidDirectory, tblProperties, 
baseSize, deltaSize, conf);
+    } else {
+      LOG.info("Going to initiate as user " + runAs + " for " + 
ci.getFullPartitionName());
+      UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, 
UserGroupInformation.getLoginUser());
+      CompactionType compactionType;
+      try {
+        compactionType = ugi.doAs(
+            (PrivilegedExceptionAction<CompactionType>) () -> 
determineCompactionType(ci, acidDirectory, tblProperties,
+                baseSize, deltaSize, conf));
+      } finally {
+        try {
+          FileSystem.closeAllForUGI(ugi);
+        } catch (IOException exception) {
+          LOG.error("Could not clean up file-system handles for UGI: " + ugi + 
" for " + ci.getFullPartitionName(),
+              exception);
+        }
+      }
+      return compactionType;
+    }
+  }
+
+  private static ValidWriteIdList resolveValidWriteIds(Table t, TxnStore 
txnHandler, HiveConf conf)
+      throws NoSuchTxnException, MetaException {
+    ValidTxnList validTxnList = new 
ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY));
+    // The response will have one entry per table and hence we get only one 
ValidWriteIdList
+    String fullTableName = TxnUtils.getFullTableName(t.getDbName(), 
t.getTableName());
+    GetValidWriteIdsRequest validWriteIdsRequest =
+        new GetValidWriteIdsRequest(Collections.singletonList(fullTableName));
+    validWriteIdsRequest.setValidTxnList(validTxnList.writeToString());
+
+    return TxnUtils.createValidCompactWriteIdList(
+        
txnHandler.getValidWriteIds(validWriteIdsRequest).getTblValidWriteIds().get(0));
+  }
+
+  public static CompactionResponse scheduleCompactionIfRequired(CompactionInfo 
ci, Table t, Partition p, String runAs,
+      boolean metricsEnabled, String hostName, TxnStore txnHandler, HiveConf 
conf) throws MetaException {
+    StorageDescriptor sd = resolveStorageDescriptor(t, p);
+    try {
+      ValidWriteIdList validWriteIds = resolveValidWriteIds(t, txnHandler, 
conf);
+
+      checkInterrupt(Initiator.class.getName());
+
+      CompactionType type = checkForCompaction(ci, validWriteIds, sd, 
t.getParameters(), runAs, txnHandler, conf);
+      if (type != null) {
+        ci.type = type;
+        return requestCompaction(ci, runAs, hostName, txnHandler);
+      }
+    } catch (InterruptedException e) {
+      //Handle InterruptedException separately so the compactionInfo won't be 
marked as failed.
+      LOG.info("Initiator pool is being shut down, task received 
interruption.");
+    } catch (Throwable ex) {
+      String errorMessage = "Caught exception while trying to determine if we 
should compact " + ci
+              + ". Marking " + "failed to avoid repeated failures, " + ex;
+      LOG.error(errorMessage);
+      ci.errorMessage = errorMessage;
+      if (metricsEnabled) {
+        
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER).inc();
+      }
+      txnHandler.markFailed(ci);
+    }
+    return null;
+  }
+
+  public static CompactionResponse initiateCompactionForPartition(Table table, 
Partition partition,
+      CompactionRequest compactionRequest, String hostName, TxnStore 
txnHandler, HiveConf inputConf) throws MetaException {
+    ValidTxnList validTxnList = 
TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0);
+    HiveConf conf = new HiveConf(inputConf);
+    conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+    CompactionResponse compactionResponse;
+    CompactionInfo compactionInfo =
+        new CompactionInfo(table.getDbName(), table.getTableName(), 
compactionRequest.getPartitionname(),
+            compactionRequest.getType());
+    compactionInfo.initiatorId = compactionRequest.getInitiatorId();
+    compactionInfo.orderByClause = compactionRequest.getOrderByClause();
+    compactionInfo.initiatorVersion = compactionRequest.getInitiatorVersion();
+    if (compactionRequest.getNumberOfBuckets() > 0) {
+      compactionInfo.numberOfBuckets = compactionRequest.getNumberOfBuckets();
+    }
+    compactionInfo.poolName = compactionRequest.getPoolName();
+    try {
+      StorageDescriptor sd = resolveStorageDescriptor(table, partition);
+      String runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf);
+      LOG.info("Checking to see if we should compact partition {} of table 
{}.{}", compactionInfo.partName,
+          table.getDbName(), table.getTableName());
+      compactionResponse =
+          scheduleCompactionIfRequired(compactionInfo, table, partition, 
runAs, false, hostName,
+              txnHandler, conf);
+    } catch (IOException | InterruptedException | MetaException e) {
+      LOG.error("Error occurred while Checking if we should compact partition 
{} of table {}.{} Exception: {}",
+          compactionInfo.partName, table.getDbName(), table.getTableName(), 
e.getMessage());
+      throw new RuntimeException(e);
+    }
+    return compactionResponse;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 58cb478bbe8..75c435a3123 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -51,7 +51,7 @@ import static 
org.apache.hadoop.hive.conf.Constants.COMPACTOR_INTIATOR_THREAD_NA
  * A class to initiate compactions.  This will run in a separate thread.
  * It's critical that there exactly 1 of these in a given warehouse.
  */
-public class Initiator extends InitiatorBase {
+public class Initiator extends MetaStoreCompactorThread {
   static final private String CLASS_NAME = Initiator.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
@@ -153,11 +153,11 @@ public class Initiator extends InitiatorBase {
                * Therefore, using a thread pool here and running 
checkForCompactions in parallel */
               String tableName = ci.getFullTableName();
               String partition = ci.getFullPartitionName();
-
+              ci.initiatorVersion = this.runtimeVersion;
               CompletableFuture<Void> asyncJob =
                   CompletableFuture.runAsync(
                           CompactorUtil.ThrowingRunnable.unchecked(() ->
-                              scheduleCompactionIfRequired(ci, t, p, runAs, 
metricsEnabled)), compactionExecutor)
+                                  
CompactorUtil.scheduleCompactionIfRequired(ci, t, p, runAs, metricsEnabled, 
hostName, txnHandler, conf)), compactionExecutor)
                       .exceptionally(exc -> {
                         LOG.error("Error while running scheduling the 
compaction on the table {} / partition {}.", tableName, partition, exc);
                         return null;
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java
deleted file mode 100644
index 8f632dbd398..00000000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java
+++ /dev/null
@@ -1,307 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.txn.compactor;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
-import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.CompactionRequest;
-import org.apache.hadoop.hive.metastore.api.CompactionResponse;
-import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
-import org.apache.hadoop.hive.metastore.metrics.Metrics;
-import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
-import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.apache.hadoop.hive.ql.io.AcidDirectory;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.shims.HadoopShims;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.common.util.Ref;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-public class InitiatorBase extends MetaStoreCompactorThread {
-
-  static final private String COMPACTOR_THRESHOLD_PREFIX = 
"compactorthreshold.";
-
-  private List<CompactionResponse> 
initiateCompactionForMultiplePartitions(Table table,
-      Map<String, Partition> partitions, CompactionRequest request) {
-    List<CompactionResponse> compactionResponses = new ArrayList<>();
-    partitions.entrySet().parallelStream().forEach(entry -> {
-      try {
-        StorageDescriptor sd = CompactorUtil.resolveStorageDescriptor(table, 
entry.getValue());
-        String runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf);
-        CompactionInfo ci =
-            new CompactionInfo(table.getDbName(), table.getTableName(), 
entry.getKey(), request.getType());
-        ci.initiatorId = request.getInitiatorId();
-        ci.orderByClause = request.getOrderByClause();
-        ci.initiatorVersion = request.getInitiatorVersion();
-        if (request.getNumberOfBuckets() > 0) {
-          ci.numberOfBuckets = request.getNumberOfBuckets();
-        }
-        ci.poolName = request.getPoolName();
-        LOG.info(
-            "Checking to see if we should compact partition " + entry.getKey() 
+ " of table " + table.getDbName() + "."
-                + table.getTableName());
-        CollectionUtils.addIgnoreNull(compactionResponses,
-            scheduleCompactionIfRequired(ci, table, entry.getValue(), runAs, 
false));
-      } catch (IOException | InterruptedException | MetaException e) {
-        LOG.error(
-            "Error occurred while Checking if we should compact partition " + 
entry.getKey() + " of table " + table.getDbName() + "."
-                + table.getTableName() + " Exception: " + e.getMessage());
-        throw new RuntimeException(e);
-      }
-    });
-    return compactionResponses;
-  }
-
-  public List<CompactionResponse> initiateCompactionForTable(CompactionRequest 
request, Table table, Map<String, Partition> partitions) throws Exception {
-    ValidTxnList validTxnList = 
TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0);
-    conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
-
-    if (request.getPartitionname()!= null || partitions.isEmpty()) {
-      List<CompactionResponse> responses = new ArrayList<>();
-      responses.add(txnHandler.compact(request));
-      return responses;
-    } else {
-      return initiateCompactionForMultiplePartitions(table, partitions, 
request);
-    }
-  }
-
-  @Override protected boolean isCacheEnabled() {
-    return false;
-  }
-
-  private String getInitiatorId(long threadId) {
-    return this.hostName + "-" + threadId;
-  }
-
-  private CompactionResponse requestCompaction(CompactionInfo ci, String 
runAs) throws MetaException {
-    CompactionRequest compactionRequest = new CompactionRequest(ci.dbname, 
ci.tableName, ci.type);
-    if (ci.partName != null)
-      compactionRequest.setPartitionname(ci.partName);
-    compactionRequest.setRunas(runAs);
-    if (StringUtils.isEmpty(ci.initiatorId)) {
-      
compactionRequest.setInitiatorId(getInitiatorId(Thread.currentThread().getId()));
-    } else {
-      compactionRequest.setInitiatorId(ci.initiatorId);
-    }
-    compactionRequest.setInitiatorVersion(this.runtimeVersion);
-    compactionRequest.setPoolName(ci.poolName);
-    LOG.info("Requesting compaction: " + compactionRequest);
-    CompactionResponse resp = txnHandler.compact(compactionRequest);
-    if (resp.isAccepted()) {
-      ci.id = resp.getId();
-    }
-    return resp;
-  }
-
-  private AcidDirectory getAcidDirectory(StorageDescriptor sd, 
ValidWriteIdList writeIds) throws IOException {
-    Path location = new Path(sd.getLocation());
-    FileSystem fs = location.getFileSystem(conf);
-    return AcidUtils.getAcidState(fs, location, conf, writeIds, 
Ref.from(false), false);
-  }
-
-  private CompactionType determineCompactionType(CompactionInfo ci, 
AcidDirectory dir,
-      Map<String, String> tblProperties, long baseSize, long deltaSize) {
-    boolean noBase = false;
-    List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
-    if (baseSize == 0 && deltaSize > 0) {
-      noBase = true;
-    } else {
-      String deltaPctProp =
-          tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD);
-      float deltaPctThreshold = deltaPctProp == null ? 
HiveConf.getFloatVar(conf,
-          HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) : 
Float.parseFloat(deltaPctProp);
-      boolean bigEnough = (float) deltaSize / (float) baseSize > 
deltaPctThreshold;
-      boolean multiBase = dir.getObsolete().stream().anyMatch(path -> 
path.getName().startsWith(AcidUtils.BASE_PREFIX));
-
-      boolean initiateMajor = bigEnough || (deltaSize == 0 && multiBase);
-      if (LOG.isDebugEnabled()) {
-        StringBuilder msg = new StringBuilder("delta size: ");
-        msg.append(deltaSize);
-        msg.append(" base size: ");
-        msg.append(baseSize);
-        msg.append(" multiBase ");
-        msg.append(multiBase);
-        msg.append(" deltaSize ");
-        msg.append(deltaSize);
-        msg.append(" threshold: ");
-        msg.append(deltaPctThreshold);
-        msg.append(" delta/base ratio > 
").append(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD.varname)
-            .append(": ");
-        msg.append(bigEnough);
-        msg.append(".");
-        if (!initiateMajor) {
-          msg.append("not");
-        }
-        msg.append(" initiating major compaction.");
-        LOG.debug(msg.toString());
-      }
-      if (initiateMajor)
-        return CompactionType.MAJOR;
-    }
-
-    String deltaNumProp =
-        tblProperties.get(COMPACTOR_THRESHOLD_PREFIX + 
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
-    int deltaNumThreshold = deltaNumProp == null ? HiveConf.getIntVar(conf,
-        HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) : 
Integer.parseInt(deltaNumProp);
-    boolean enough = deltas.size() > deltaNumThreshold;
-    if (!enough) {
-      LOG.debug("Not enough deltas to initiate compaction for table=" + 
ci.tableName + "partition=" + ci.partName
-          + ". Found: " + deltas.size() + " deltas, threshold is " + 
deltaNumThreshold);
-      return null;
-    }
-    // If there's no base file, do a major compaction
-    LOG.debug("Found " + deltas.size() + " delta files, and " + (noBase ? "no" 
: "has") + " base," + "requesting "
-        + (noBase ? "major" : "minor") + " compaction");
-
-    return noBase || !CompactorUtil.isMinorCompactionSupported(conf, 
tblProperties, dir) ? CompactionType.MAJOR : CompactionType.MINOR;
-  }
-
-  private long getBaseSize(AcidDirectory dir) throws IOException {
-    long baseSize = 0;
-    if (dir.getBase() != null) {
-      baseSize = getDirSize(dir.getFs(), dir.getBase());
-    } else {
-      for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles()) 
{
-        baseSize += origStat.getFileStatus().getLen();
-      }
-    }
-    return baseSize;
-  }
-
-  private long getDirSize(FileSystem fs, AcidUtils.ParsedDirectory dir) throws 
IOException {
-    return dir.getFiles(fs, 
Ref.from(false)).stream().map(HadoopShims.HdfsFileStatusWithId::getFileStatus)
-        .mapToLong(FileStatus::getLen).sum();
-  }
-
-  private CompactionType checkForCompaction(final CompactionInfo ci, final 
ValidWriteIdList writeIds,
-      final StorageDescriptor sd, final Map<String, String> tblProperties, 
final String runAs)
-      throws IOException, InterruptedException {
-    // If it's marked as too many aborted, we already know we need to compact
-    if (ci.tooManyAborts) {
-      LOG.debug("Found too many aborted transactions for " + 
ci.getFullPartitionName() + ", "
-          + "initiating major compaction");
-      return CompactionType.MAJOR;
-    }
-
-    if (ci.hasOldAbort) {
-      HiveConf.ConfVars oldAbortedTimeoutProp = 
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
-      LOG.debug("Found an aborted transaction for " + 
ci.getFullPartitionName() + " with age older than threshold "
-          + oldAbortedTimeoutProp + ": " + 
conf.getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. "
-          + "Initiating minor compaction.");
-      return CompactionType.MINOR;
-    }
-    AcidDirectory acidDirectory = getAcidDirectory(sd, writeIds);
-    long baseSize = getBaseSize(acidDirectory);
-    FileSystem fs = acidDirectory.getFs();
-    Map<Path, Long> deltaSizes = new HashMap<>();
-    for (AcidUtils.ParsedDelta delta : acidDirectory.getCurrentDirectories()) {
-      deltaSizes.put(delta.getPath(), getDirSize(fs, delta));
-    }
-    long deltaSize = deltaSizes.values().stream().reduce(0L, Long::sum);
-    AcidMetricService.updateMetricsFromInitiator(ci.dbname, ci.tableName, 
ci.partName, conf, txnHandler, baseSize,
-        deltaSizes, acidDirectory.getObsolete());
-
-    if (CompactorUtil.runJobAsSelf(runAs)) {
-      return determineCompactionType(ci, acidDirectory, tblProperties, 
baseSize, deltaSize);
-    } else {
-      LOG.info("Going to initiate as user " + runAs + " for " + 
ci.getFullPartitionName());
-      UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs, 
UserGroupInformation.getLoginUser());
-      CompactionType compactionType;
-      try {
-        compactionType = ugi.doAs(
-            (PrivilegedExceptionAction<CompactionType>) () -> 
determineCompactionType(ci, acidDirectory, tblProperties,
-                baseSize, deltaSize));
-      } finally {
-        try {
-          FileSystem.closeAllForUGI(ugi);
-        } catch (IOException exception) {
-          LOG.error("Could not clean up file-system handles for UGI: " + ugi + 
" for " + ci.getFullPartitionName(),
-              exception);
-        }
-      }
-      return compactionType;
-    }
-  }
-
-  private ValidWriteIdList resolveValidWriteIds(Table t)
-      throws NoSuchTxnException, MetaException {
-    ValidTxnList validTxnList = new 
ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY));
-    // The response will have one entry per table and hence we get only one 
ValidWriteIdList
-    String fullTableName = TxnUtils.getFullTableName(t.getDbName(), 
t.getTableName());
-    GetValidWriteIdsRequest validWriteIdsRequest = new 
GetValidWriteIdsRequest(Collections.singletonList(fullTableName));
-    validWriteIdsRequest.setValidTxnList(validTxnList.writeToString());
-
-    return 
TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(validWriteIdsRequest).getTblValidWriteIds().get(0));
-  }
-
-  protected CompactionResponse scheduleCompactionIfRequired(CompactionInfo ci, 
Table t,
-      Partition p, String runAs, boolean metricsEnabled)
-      throws MetaException {
-    StorageDescriptor sd = CompactorUtil.resolveStorageDescriptor(t, p);
-    try {
-      ValidWriteIdList validWriteIds = resolveValidWriteIds(t);
-
-      CompactorUtil.checkInterrupt(InitiatorBase.class.getName());
-
-      CompactionType type = checkForCompaction(ci, validWriteIds, sd, 
t.getParameters(), runAs);
-      if (type != null) {
-        ci.type = type;
-        return requestCompaction(ci, runAs);
-      }
-    } catch (InterruptedException e) {
-      //Handle InterruptedException separately so the compactionInfo won't be 
marked as failed.
-      LOG.info("Initiator pool is being shut down, task received 
interruption.");
-    } catch (Throwable ex) {
-      String errorMessage = "Caught exception while trying to determine if we 
should compact " + ci + ". Marking "
-          + "failed to avoid repeated failures, " + ex;
-      LOG.error(errorMessage);
-      ci.errorMessage = errorMessage;
-      if (metricsEnabled) {
-        
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER).inc();
-      }
-      txnHandler.markFailed(ci);
-    }
-    return null;
-  }
-
-}


Reply via email to