zabetak commented on code in PR #4859:
URL: https://github.com/apache/hive/pull/4859#discussion_r1505717881
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java:
##########
@@ -295,4 +324,230 @@ public static LockRequest createLockRequest(HiveConf
conf, CompactionInfo ci, lo
!conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
return requestBuilder.build();
}
+
+ private static CompactionResponse requestCompaction(CompactionInfo ci,
String runAs, String hostname,
+ String runtimeVersion, TxnStore txnHandler) throws MetaException {
+ CompactionRequest compactionRequest = new CompactionRequest(ci.dbname,
ci.tableName, ci.type);
+ if (ci.partName != null)
+ compactionRequest.setPartitionname(ci.partName);
+ compactionRequest.setRunas(runAs);
+ if (StringUtils.isEmpty(ci.initiatorId)) {
+ compactionRequest.setInitiatorId(hostname + "-" +
Thread.currentThread().getId());
+ } else {
+ compactionRequest.setInitiatorId(ci.initiatorId);
+ }
+ compactionRequest.setInitiatorVersion(runtimeVersion);
+ compactionRequest.setPoolName(ci.poolName);
+ LOG.info("Requesting compaction: " + compactionRequest);
+ CompactionResponse resp = txnHandler.compact(compactionRequest);
+ if (resp.isAccepted()) {
+ ci.id = resp.getId();
+ }
+ return resp;
+ }
+
+ private static CompactionType determineCompactionType(CompactionInfo ci,
AcidDirectory dir,
+ Map<String, String> tblProperties, long baseSize, long deltaSize,
HiveConf conf) {
+ boolean noBase = false;
+ List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
+ if (baseSize == 0 && deltaSize > 0) {
+ noBase = true;
+ } else {
+ String deltaPctProp =
+ tblProperties.get(COMPACTOR_THRESHOLD_PREFIX +
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD);
+ float deltaPctThreshold = deltaPctProp == null ?
HiveConf.getFloatVar(conf,
+ HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) :
Float.parseFloat(deltaPctProp);
+ boolean bigEnough = (float) deltaSize / (float) baseSize >
deltaPctThreshold;
+ boolean multiBase = dir.getObsolete().stream().anyMatch(path ->
path.getName().startsWith(AcidUtils.BASE_PREFIX));
+
+ boolean initiateMajor = bigEnough || (deltaSize == 0 && multiBase);
+ if (LOG.isDebugEnabled()) {
+ StringBuilder msg = new StringBuilder("delta size: ");
+ msg.append(deltaSize);
+ msg.append(" base size: ");
+ msg.append(baseSize);
+ msg.append(" multiBase ");
+ msg.append(multiBase);
+ msg.append(" deltaSize ");
+ msg.append(deltaSize);
+ msg.append(" threshold: ");
+ msg.append(deltaPctThreshold);
+ msg.append(" delta/base ratio >
").append(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD.varname)
+ .append(": ");
+ msg.append(bigEnough);
+ msg.append(".");
+ if (!initiateMajor) {
+ msg.append("not");
+ }
+ msg.append(" initiating major compaction.");
+ LOG.debug(msg.toString());
+ }
+ if (initiateMajor)
+ return CompactionType.MAJOR;
+ }
+
+ String deltaNumProp =
+ tblProperties.get(COMPACTOR_THRESHOLD_PREFIX +
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
+ int deltaNumThreshold = deltaNumProp == null ? HiveConf.getIntVar(conf,
+ HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) :
Integer.parseInt(deltaNumProp);
+ boolean enough = deltas.size() > deltaNumThreshold;
+ if (!enough) {
+ LOG.debug(
+ "Not enough deltas to initiate compaction for table=" + ci.tableName
+ "partition=" + ci.partName
+ + ". Found: " + deltas.size() + " deltas, threshold is " +
deltaNumThreshold);
+ return null;
+ }
+ // If there's no base file, do a major compaction
+ LOG.debug("Found " + deltas.size() + " delta files, and " + (noBase ? "no"
: "has") + " base," + "requesting "
+ + (noBase ? "major" : "minor") + " compaction");
+
+ return noBase || !isMinorCompactionSupported(conf, tblProperties,
+ dir) ? CompactionType.MAJOR : CompactionType.MINOR;
+ }
+
+ private static long getBaseSize(AcidDirectory dir) throws IOException {
+ long baseSize = 0;
+ if (dir.getBase() != null) {
+ baseSize = getDirSize(dir.getFs(), dir.getBase());
+ } else {
+ for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles())
{
+ baseSize += origStat.getFileStatus().getLen();
+ }
+ }
+ return baseSize;
+ }
+
+ private static long getDirSize(FileSystem fs, AcidUtils.ParsedDirectory dir)
throws IOException {
+ return dir.getFiles(fs,
Ref.from(false)).stream().map(HadoopShims.HdfsFileStatusWithId::getFileStatus)
+ .mapToLong(FileStatus::getLen).sum();
+ }
+
+ private static CompactionType checkForCompaction(final CompactionInfo ci,
final ValidWriteIdList writeIds,
+ final StorageDescriptor sd, final Map<String, String> tblProperties,
final String runAs, TxnStore txnHandler,
+ HiveConf conf) throws IOException, InterruptedException {
+ // If it's marked as too many aborted, we already know we need to compact
+ if (ci.tooManyAborts) {
+ LOG.debug("Found too many aborted transactions for "
+ + ci.getFullPartitionName() + ", " + "initiating major compaction");
+ return CompactionType.MAJOR;
+ }
+
+ if (ci.hasOldAbort) {
+ HiveConf.ConfVars oldAbortedTimeoutProp =
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
+ LOG.debug(
+ "Found an aborted transaction for " + ci.getFullPartitionName() + "
with age older than threshold "
+ + oldAbortedTimeoutProp + ": " +
conf.getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. "
+ + "Initiating minor compaction.");
+ return CompactionType.MINOR;
+ }
+ AcidDirectory acidDirectory = AcidUtils.getAcidState(sd, writeIds, conf);
+ long baseSize = getBaseSize(acidDirectory);
+ FileSystem fs = acidDirectory.getFs();
+ Map<Path, Long> deltaSizes = new HashMap<>();
+ for (AcidUtils.ParsedDelta delta : acidDirectory.getCurrentDirectories()) {
+ deltaSizes.put(delta.getPath(), getDirSize(fs, delta));
+ }
+ long deltaSize = deltaSizes.values().stream().reduce(0L, Long::sum);
+ AcidMetricService.updateMetricsFromInitiator(ci.dbname, ci.tableName,
ci.partName, conf, txnHandler, baseSize,
+ deltaSizes, acidDirectory.getObsolete());
+
+ if (runJobAsSelf(runAs)) {
+ return determineCompactionType(ci, acidDirectory, tblProperties,
baseSize, deltaSize, conf);
+ } else {
+ LOG.info("Going to initiate as user " + runAs + " for " +
ci.getFullPartitionName());
+ UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs,
UserGroupInformation.getLoginUser());
+ CompactionType compactionType;
+ try {
+ compactionType = ugi.doAs(
+ (PrivilegedExceptionAction<CompactionType>) () ->
determineCompactionType(ci, acidDirectory, tblProperties,
+ baseSize, deltaSize, conf));
+ } finally {
+ try {
+ FileSystem.closeAllForUGI(ugi);
+ } catch (IOException exception) {
+ LOG.error("Could not clean up file-system handles for UGI: " + ugi +
" for " + ci.getFullPartitionName(),
+ exception);
+ }
+ }
+ return compactionType;
+ }
+ }
+
+ private static ValidWriteIdList resolveValidWriteIds(Table t, TxnStore
txnHandler, HiveConf conf)
+ throws NoSuchTxnException, MetaException {
+ ValidTxnList validTxnList = new
ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY));
+ // The response will have one entry per table and hence we get only one
ValidWriteIdList
+ String fullTableName = TxnUtils.getFullTableName(t.getDbName(),
t.getTableName());
+ GetValidWriteIdsRequest validWriteIdsRequest =
+ new GetValidWriteIdsRequest(Collections.singletonList(fullTableName));
+ validWriteIdsRequest.setValidTxnList(validTxnList.writeToString());
+
+ return TxnUtils.createValidCompactWriteIdList(
+
txnHandler.getValidWriteIds(validWriteIdsRequest).getTblValidWriteIds().get(0));
+ }
+
+ public static CompactionResponse scheduleCompactionIfRequired(CompactionInfo
ci, Table t, Partition p, String runAs,
+ boolean metricsEnabled, String hostName, TxnStore txnHandler, HiveConf
conf) throws MetaException {
+ StorageDescriptor sd = resolveStorageDescriptor(t, p);
+ try {
+ ValidWriteIdList validWriteIds = resolveValidWriteIds(t, txnHandler,
conf);
+
+ checkInterrupt(Initiator.class.getName());
+
+ CompactionType type = checkForCompaction(ci, validWriteIds, sd,
t.getParameters(), runAs, txnHandler, conf);
+ if (type != null) {
+ ci.type = type;
+ return requestCompaction(ci, runAs, hostName, ci.initiatorVersion,
txnHandler);
Review Comment:
nit: Since we are already passing the `ci` in the method the
`ci.initiatorVersion` is redundant and can be dropped.
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java:
##########
@@ -32,23 +38,44 @@
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.CompactionResponse;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
Review Comment:
nit: Is the import order correct? I was expecting that
`api.CompactionResponse` would be before `api.Partition` etc.
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java:
##########
@@ -295,4 +324,232 @@ public static LockRequest createLockRequest(HiveConf
conf, CompactionInfo ci, lo
!conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
return requestBuilder.build();
}
+
+ public static String getInitiatorId(long threadId, String hostName) {
+ return hostName + "-" + threadId;
+ }
+
+ public static CompactionResponse requestCompaction(CompactionInfo ci, String
runAs, String hostname, String runtimeVersion, TxnStore txnHandler) throws
MetaException {
+ CompactionRequest compactionRequest = new CompactionRequest(ci.dbname,
ci.tableName, ci.type);
+ if (ci.partName != null)
+ compactionRequest.setPartitionname(ci.partName);
+ compactionRequest.setRunas(runAs);
+ if (StringUtils.isEmpty(ci.initiatorId)) {
+
compactionRequest.setInitiatorId(getInitiatorId(Thread.currentThread().getId(),hostname));
+ } else {
+ compactionRequest.setInitiatorId(ci.initiatorId);
+ }
+ compactionRequest.setInitiatorVersion(runtimeVersion);
+ compactionRequest.setPoolName(ci.poolName);
+ LOG.info("Requesting compaction: " + compactionRequest);
+ CompactionResponse resp = txnHandler.compact(compactionRequest);
+ if (resp.isAccepted()) {
+ ci.id = resp.getId();
+ }
+ return resp;
+ }
+
+ static AcidDirectory getAcidDirectory(StorageDescriptor sd, ValidWriteIdList
writeIds, HiveConf conf) throws IOException {
+ Path location = new Path(sd.getLocation());
+ FileSystem fs = location.getFileSystem(conf);
+ return AcidUtils.getAcidState(fs, location, conf, writeIds,
Ref.from(false), false);
+ }
+
+ public static CompactionType determineCompactionType(CompactionInfo ci,
AcidDirectory dir,
+ Map<String, String>
tblProperties, long baseSize, long deltaSize, HiveConf conf) {
+ boolean noBase = false;
+ List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
+ if (baseSize == 0 && deltaSize > 0) {
+ noBase = true;
+ } else {
+ String deltaPctProp =
+ tblProperties.get(COMPACTOR_THRESHOLD_PREFIX +
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD);
+ float deltaPctThreshold = deltaPctProp == null ?
HiveConf.getFloatVar(conf,
+ HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) :
Float.parseFloat(deltaPctProp);
+ boolean bigEnough = (float) deltaSize / (float) baseSize >
deltaPctThreshold;
+ boolean multiBase = dir.getObsolete().stream().anyMatch(path ->
path.getName().startsWith(AcidUtils.BASE_PREFIX));
+
+ boolean initiateMajor = bigEnough || (deltaSize == 0 && multiBase);
+ if (LOG.isDebugEnabled()) {
+ StringBuilder msg = new StringBuilder("delta size: ");
+ msg.append(deltaSize);
+ msg.append(" base size: ");
+ msg.append(baseSize);
+ msg.append(" multiBase ");
+ msg.append(multiBase);
+ msg.append(" deltaSize ");
+ msg.append(deltaSize);
+ msg.append(" threshold: ");
+ msg.append(deltaPctThreshold);
+ msg.append(" delta/base ratio >
").append(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD.varname)
+ .append(": ");
+ msg.append(bigEnough);
+ msg.append(".");
+ if (!initiateMajor) {
+ msg.append("not");
+ }
+ msg.append(" initiating major compaction.");
+ LOG.debug(msg.toString());
+ }
+ if (initiateMajor)
+ return CompactionType.MAJOR;
+ }
+
+ String deltaNumProp =
+ tblProperties.get(COMPACTOR_THRESHOLD_PREFIX +
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
+ int deltaNumThreshold = deltaNumProp == null ? HiveConf.getIntVar(conf,
+ HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) :
Integer.parseInt(deltaNumProp);
+ boolean enough = deltas.size() > deltaNumThreshold;
+ if (!enough) {
+ LOG.debug("Not enough deltas to initiate compaction for table=" +
ci.tableName + "partition=" + ci.partName
+ + ". Found: " + deltas.size() + " deltas, threshold is " +
deltaNumThreshold);
+ return null;
+ }
+ // If there's no base file, do a major compaction
+ LOG.debug("Found " + deltas.size() + " delta files, and " + (noBase ? "no"
: "has") + " base," + "requesting "
+ + (noBase ? "major" : "minor") + " compaction");
+
+ return noBase || !CompactorUtil.isMinorCompactionSupported(conf,
tblProperties, dir) ? CompactionType.MAJOR : CompactionType.MINOR;
+ }
+
+ public static long getBaseSize(AcidDirectory dir) throws IOException {
+ long baseSize = 0;
+ if (dir.getBase() != null) {
+ baseSize = getDirSize(dir.getFs(), dir.getBase());
+ } else {
+ for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles())
{
+ baseSize += origStat.getFileStatus().getLen();
+ }
+ }
+ return baseSize;
+ }
+
+ public static long getDirSize(FileSystem fs, AcidUtils.ParsedDirectory dir)
throws IOException {
+ return dir.getFiles(fs,
Ref.from(false)).stream().map(HadoopShims.HdfsFileStatusWithId::getFileStatus)
+ .mapToLong(FileStatus::getLen).sum();
+ }
+
+ public static CompactionType checkForCompaction(final CompactionInfo ci,
final ValidWriteIdList writeIds,
+ final StorageDescriptor sd,
final Map<String, String> tblProperties, final String runAs, TxnStore
txnHandler, HiveConf conf)
+ throws IOException, InterruptedException {
+ // If it's marked as too many aborted, we already know we need to compact
+ if (ci.tooManyAborts) {
+ LOG.debug("Found too many aborted transactions for " +
ci.getFullPartitionName() + ", "
+ + "initiating major compaction");
+ return CompactionType.MAJOR;
+ }
+
+ if (ci.hasOldAbort) {
+ HiveConf.ConfVars oldAbortedTimeoutProp =
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
+ LOG.debug("Found an aborted transaction for " +
ci.getFullPartitionName() + " with age older than threshold "
+ + oldAbortedTimeoutProp + ": " +
conf.getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. "
+ + "Initiating minor compaction.");
+ return CompactionType.MINOR;
+ }
+ AcidDirectory acidDirectory = getAcidDirectory(sd, writeIds, conf);
+ long baseSize = getBaseSize(acidDirectory);
+ FileSystem fs = acidDirectory.getFs();
+ Map<Path, Long> deltaSizes = new HashMap<>();
+ for (AcidUtils.ParsedDelta delta : acidDirectory.getCurrentDirectories()) {
+ deltaSizes.put(delta.getPath(), getDirSize(fs, delta));
+ }
+ long deltaSize = deltaSizes.values().stream().reduce(0L, Long::sum);
+ AcidMetricService.updateMetricsFromInitiator(ci.dbname, ci.tableName,
ci.partName, conf, txnHandler, baseSize,
+ deltaSizes, acidDirectory.getObsolete());
+
+ if (CompactorUtil.runJobAsSelf(runAs)) {
+ return determineCompactionType(ci, acidDirectory, tblProperties,
baseSize, deltaSize, conf);
+ } else {
+ LOG.info("Going to initiate as user " + runAs + " for " +
ci.getFullPartitionName());
+ UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs,
UserGroupInformation.getLoginUser());
+ CompactionType compactionType;
+ try {
+ compactionType = ugi.doAs(
+ (PrivilegedExceptionAction<CompactionType>) () ->
determineCompactionType(ci, acidDirectory, tblProperties,
+ baseSize, deltaSize, conf));
+ } finally {
+ try {
+ FileSystem.closeAllForUGI(ugi);
+ } catch (IOException exception) {
+ LOG.error("Could not clean up file-system handles for UGI: " + ugi +
" for " + ci.getFullPartitionName(),
+ exception);
+ }
+ }
+ return compactionType;
+ }
+ }
+
+ public static ValidWriteIdList resolveValidWriteIds(Table t, TxnStore
txnHandler, HiveConf conf)
+ throws NoSuchTxnException, MetaException {
+ ValidTxnList validTxnList = new
ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY));
+ // The response will have one entry per table and hence we get only one
ValidWriteIdList
+ String fullTableName = TxnUtils.getFullTableName(t.getDbName(),
t.getTableName());
+ GetValidWriteIdsRequest validWriteIdsRequest = new
GetValidWriteIdsRequest(Collections.singletonList(fullTableName));
+ validWriteIdsRequest.setValidTxnList(validTxnList.writeToString());
+
+ return
TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(validWriteIdsRequest).getTblValidWriteIds().get(0));
+ }
+
+ public static CompactionResponse scheduleCompactionIfRequired(CompactionInfo
ci, Table t,
+ Partition p,
String runAs, boolean metricsEnabled, String hostName, TxnStore txnHandler,
HiveConf conf)
+ throws MetaException {
+ StorageDescriptor sd = CompactorUtil.resolveStorageDescriptor(t, p);
+ try {
+ ValidWriteIdList validWriteIds = resolveValidWriteIds(t,txnHandler,
conf);
+
+ CompactorUtil.checkInterrupt(Initiator.class.getName());
+
+ CompactionType type = checkForCompaction(ci, validWriteIds, sd,
t.getParameters(), runAs, txnHandler, conf);
+ if (type != null) {
+ ci.type = type;
+ return requestCompaction(ci, runAs,hostName, ci.initiatorVersion,
txnHandler);
+ }
+ } catch (InterruptedException e) {
+ //Handle InterruptedException separately so the compactionInfo won't be
marked as failed.
+ LOG.info("Initiator pool is being shut down, task received
interruption.");
+ } catch (Throwable ex) {
+ String errorMessage = "Caught exception while trying to determine if we
should compact " + ci + ". Marking "
+ + "failed to avoid repeated failures, " + ex;
+ LOG.error(errorMessage);
+ ci.errorMessage = errorMessage;
+ if (metricsEnabled) {
+
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER).inc();
+ }
+ txnHandler.markFailed(ci);
+ }
+ return null;
+ }
+
+ public static CompactionResponse initiateCompactionForPartition(Table table,
Partition partition,
+ CompactionRequest
compactionRequest,String hostName, TxnStore txnHandler, HiveConf conf) throws
MetaException {
+ ValidTxnList validTxnList =
TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0);
+ conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+ CompactionResponse compactionResponse;
+ CompactionInfo compactionInfo =
+ new CompactionInfo(table.getDbName(), table.getTableName(),
compactionRequest.getPartitionname(),
+ compactionRequest.getType());
+ compactionInfo.initiatorId = compactionRequest.getInitiatorId();
+ compactionInfo.orderByClause = compactionRequest.getOrderByClause();
+ compactionInfo.initiatorVersion = compactionRequest.getInitiatorVersion();
+ if (compactionRequest.getNumberOfBuckets() > 0) {
+ compactionInfo.numberOfBuckets = compactionRequest.getNumberOfBuckets();
+ }
+ compactionInfo.poolName = compactionRequest.getPoolName();
+ try {
+ StorageDescriptor sd = CompactorUtil.resolveStorageDescriptor(table,
partition);
+ String runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf);
+ LOG.info("Checking to see if we should compact partition {} of table
{}.{}",
+ compactionInfo.partName, table.getDbName(),
table.getTableName());
+ compactionResponse =
CompactorUtil.scheduleCompactionIfRequired(compactionInfo, table, partition,
runAs, false, hostName, txnHandler, conf);
+ } catch (IOException | InterruptedException | MetaException e) {
+ LOG.error("Error occurred while Checking if we should compact partition
{} of table {}.{} Exception: {}",
+ compactionInfo.partName, table.getDbName(),
table.getTableName(), e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return compactionResponse;
+ }
+
+ public static CompactionResponse
initiateCompactionForTable(CompactionRequest request, TxnStore txnHandler)
throws MetaException {
+ return txnHandler.compact(request);
Review Comment:
The code before this refactoring is not failing so I am trying to understand
what is causing this change in behavior. I don't want to let changes in
behavior break in the code base accidentally. Can you please elaborate why it
was necessary to do it before and it shouldn't be done now?
Other than that if the method is simply a call to
`txnHandler.compact(request)` then it is not really necessary to wrap it in a
utility. Using `txnHandler.compact(request)` directly is shorter and more
explicit.
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java:
##########
@@ -32,23 +38,44 @@
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.CompactionResponse;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+
+
Review Comment:
nit: Is there a style guideline that mandates multiple spaces in imports? I
don't understand with what criterion they are separated.
##########
ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java:
##########
@@ -35,27 +37,37 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.txn.compactor.InitiatorBase;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.Optional;
import static
org.apache.hadoop.hive.ql.io.AcidUtils.compactionTypeStr2ThriftType;
/**
* Operation process of compacting a table.
*/
public class AlterTableCompactOperation extends
DDLOperation<AlterTableCompactDesc> {
+
public AlterTableCompactOperation(DDLOperationContext context,
AlterTableCompactDesc desc) {
super(context, desc);
}
@Override public int execute() throws Exception {
+ TxnStore txnHandler;
Table table = context.getDb().getTable(desc.getTableName());
if (!AcidUtils.isTransactionalTable(table) &&
!AcidUtils.isNonNativeAcidTable(table)) {
throw new HiveException(ErrorMsg.NONACID_COMPACTION_NOT_SUPPORTED,
table.getDbName(), table.getTableName());
}
+ Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap =
+ convertPartitionsFromThriftToDB(getPartitions(table, desc, context));
+
+ txnHandler = TxnUtils.getTxnStore(context.getConf());
Review Comment:
nit: txnHandler definition could be moved here. It is a good practice to
declare variables closer to their actual usages (see Item 57: Minimize the
scope of local variables in Effective Java).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]