difin commented on code in PR #5886: URL: https://github.com/apache/hive/pull/5886#discussion_r2176318613
########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/TableOptimizer.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; + +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Comparator; +import java.util.List; +import java.util.LongSummaryStatistics; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; + +public abstract class TableOptimizer { + static final private String CLASS_NAME = AcidTableOptimizer.class.getName(); + static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + public abstract Set<CompactionInfo> findPotentialCompactions(long lastChecked, ShowCompactResponse currentCompactions, + Set<String> skipDBs, Set<String> skipTables) throws MetaException; + + protected final HiveConf conf; + protected final TxnStore txnHandler; + protected final MetadataCache metadataCache; + + public TableOptimizer(HiveConf conf, TxnStore txnHandler, MetadataCache metadataCache) { + this.conf = conf; + this.txnHandler = txnHandler; + this.metadataCache = metadataCache; + } + + protected boolean isEligibleForCompaction(CompactionInfo ci, ShowCompactResponse currentCompactions, + Set<String> skipDBs, Set<String> skipTables) { + try { + if (skipDBs.contains(ci.dbname)) { + LOG.info("Skipping {}::{}, skipDBs::size:{}", ci.dbname, ci.tableName, skipDBs.size()); + return false; + } else { + if (replIsCompactionDisabledForDatabase(ci.dbname)) { + skipDBs.add(ci.dbname); + LOG.info("Skipping {} as compaction is disabled due to repl; skipDBs::size:{}", + ci.dbname, skipDBs.size()); + return false; + } + } + + if (skipTables.contains(ci.getFullTableName())) { + return false; + } + + LOG.info("Checking to see if we should compact {}", ci.getFullPartitionName()); + + // Check if we have already initiated or are working on a compaction for this table/partition. + // Also make sure we haven't exceeded configured number of consecutive failures. + // If any of the above applies, skip it. + // Note: if we are just waiting on cleaning we can still check, as it may be time to compact again even though we haven't cleaned. + if (foundCurrentOrFailedCompactions(currentCompactions, ci)) { + return false; + } + + Table t = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> + CompactorUtil.resolveTable(conf, ci.dbname, ci.tableName)); + if (t == null) { + LOG.info("Can't find table {}, assuming it's a temp table or has been dropped and moving on.", + ci.getFullTableName()); + return false; + } + + if (replIsCompactionDisabledForTable(t)) { + skipTables.add(ci.getFullTableName()); + return false; + } + + Map<String, String> dbParams = metadataCache.computeIfAbsent(ci.dbname, () -> resolveDatabase(ci)).getParameters(); + if (MetaStoreUtils.isNoAutoCompactSet(dbParams, t.getParameters())) { + if (Boolean.parseBoolean(MetaStoreUtils.getNoAutoCompact(dbParams))) { + skipDBs.add(ci.dbname); + LOG.info("DB {} marked {}=true so we will not compact it.", hive_metastoreConstants.NO_AUTO_COMPACT, ci.dbname); + } else { + skipTables.add(ci.getFullTableName()); + LOG.info("Table {} marked {}=true so we will not compact it.", hive_metastoreConstants.NO_AUTO_COMPACT, + Warehouse.getQualifiedName(t)); + } + return false; + } + } catch (Throwable e) { + LOG.error("Caught exception while checking compaction eligibility.", e); + try { + ci.errorMessage = e.getMessage(); + txnHandler.markFailed(ci); + } catch (MetaException ex) { + LOG.error("Caught exception while marking compaction as failed.", e); + } + return false; + } + return true; + } + + private boolean replIsCompactionDisabledForTable(Table tbl) { + // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. + boolean isCompactDisabled = ReplUtils.isFirstIncPending(tbl.getParameters()); + if (isCompactDisabled) { + LOG.info("Compaction is disabled for table {}", tbl.getTableName()); + } + return isCompactDisabled; + } + + private boolean replIsCompactionDisabledForDatabase(String dbName) throws TException { + try { + Database database = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), dbName); + // Compaction is disabled until after first successful incremental load. Check HIVE-21197 for more detail. + boolean isReplCompactDisabled = ReplUtils.isFirstIncPending(database.getParameters()); + if (isReplCompactDisabled) { + LOG.info("Compaction is disabled for database {}", dbName); + } + return isReplCompactDisabled; + } catch (NoSuchObjectException e) { + LOG.info("Unable to find database {}", dbName); + return true; + } + } + + protected boolean foundCurrentOrFailedCompactions(ShowCompactResponse compactions, CompactionInfo ci) throws MetaException { + if (compactions.getCompacts() == null) { + return false; + } + + //In case of an aborted Dynamic partition insert, the created entry in the compaction queue does not contain + //a partition name even for partitioned tables. As a result it can happen that the ShowCompactResponse contains + //an element without partition name for partitioned tables. Therefore, it is necessary to null check the partition + //name of the ShowCompactResponseElement even if the CompactionInfo.partName is not null. These special compaction + //requests are skipped by the worker, and only cleaner will pick them up, so we should allow to schedule a 'normal' + //compaction for partitions of those tables which has special (DP abort) entry with undefined partition name. + List<ShowCompactResponseElement> filteredElements = compactions.getCompacts().stream() + .filter(e -> e.getDbname().equals(ci.dbname) + && e.getTablename().equals(ci.tableName) + && (e.getPartitionname() == null && ci.partName == null || + (Objects.equals(e.getPartitionname(),ci.partName)))) + .toList(); + + // Figure out if there are any currently running compactions on the same table or partition. + if (filteredElements.stream().anyMatch( + e -> TxnStore.WORKING_RESPONSE.equals(e.getState()) || TxnStore.INITIATED_RESPONSE.equals(e.getState()))) { + + LOG.info("Found currently initiated or working compaction for {} so we will not initiate another compaction", + ci.getFullPartitionName()); + return true; + } + + // Check if there is already sufficient number of consecutive failures for this table/partition + // so that no new automatic compactions needs to be scheduled. + int failedThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); + + LongSummaryStatistics failedStats = filteredElements.stream() + .filter(e -> TxnStore.SUCCEEDED_RESPONSE.equals(e.getState()) || TxnStore.FAILED_RESPONSE.equals(e.getState())) + .sorted(Comparator.comparingLong(ShowCompactResponseElement::getId).reversed()) + .limit(failedThreshold) + + .filter(e -> TxnStore.FAILED_RESPONSE.equals(e.getState())) + .collect(Collectors.summarizingLong(ShowCompactResponseElement::getEnqueueTime)); + + // If the last attempt was too long ago, ignore the failed threshold and try compaction again + long retryTime = MetastoreConf.getTimeVar(conf, + MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_RETRY_TIME, TimeUnit.MILLISECONDS); + + boolean needsRetry = (retryTime > 0) && (failedStats.getMax() + retryTime < System.currentTimeMillis()); + if (failedStats.getCount() == failedThreshold && !needsRetry) { + LOG.warn("Will not initiate compaction for {} since last {} attempts to compact it failed.", + ci.getFullPartitionName(), MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); + + ci.errorMessage = "Compaction is not initiated since last " + + MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " consecutive compaction attempts failed)"; + + txnHandler.markFailed(ci); + return true; + } + return false; + } + + protected Database resolveDatabase(CompactionInfo ci) throws MetaException, NoSuchObjectException { + return CompactorUtil.resolveDatabase(conf, ci.dbname); + } + + protected boolean isCacheEnabled() { + return MetastoreConf.getBoolVar(conf, + MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLECACHE_ON); + } Review Comment: No, removed. Strangely, my IDE wasn't showing that it was unused. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org