[ https://issues.apache.org/jira/browse/HIVE-27020?focusedWorklogId=851952&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-851952 ]
ASF GitHub Bot logged work on HIVE-27020: ----------------------------------------- Author: ASF GitHub Bot Created on: 21/Mar/23 08:26 Start Date: 21/Mar/23 08:26 Worklog Time Spent: 10m Work Description: veghlaci05 commented on code in PR #4091: URL: https://github.com/apache/hive/pull/4091#discussion_r1143022684 ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java: ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor.handler; + +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.apache.hadoop.hive.metastore.metrics.PerfLogger; +import org.apache.hadoop.hive.metastore.txn.AcidTxnInfo; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil; +import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable; +import org.apache.hadoop.hive.ql.txn.compactor.FSRemover; +import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static java.util.Objects.isNull; + +/** + * Abort-cleanup based implementation of TaskHandler. + * Provides implementation of creation of abort clean tasks. + */ +class AbortedTxnCleaner extends AcidTxnCleaner { + + private static final Logger LOG = LoggerFactory.getLogger(AbortedTxnCleaner.class.getName()); + + public AbortedTxnCleaner(HiveConf conf, TxnStore txnHandler, + MetadataCache metadataCache, boolean metricsEnabled, + FSRemover fsRemover) { + super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover); + } + + /** + The following cleanup is based on the following idea - <br> + 1. Aborted cleanup is independent of compaction. This is because directories which are written by + aborted txns are not visible by any open txns. It is only visible while determining the AcidState (which + only sees the aborted deltas and does not read the file).<br><br> + + The following algorithm is used to clean the set of aborted directories - <br> + a. Find the list of entries which are suitable for cleanup (This is done in {@link TxnStore#findReadyToCleanForAborts(long, int)}).<br> + b. If the table/partition does not exist, then remove the associated aborted entry in TXN_COMPONENTS table. <br> + c. Get the AcidState of the table by using the min open txnID, database name, tableName, partition name, highest write ID <br> + d. Fetch the aborted directories and delete the directories. <br> + e. Fetch the aborted write IDs from the AcidState and use it to delete the associated metadata in the TXN_COMPONENTS table. + **/ + @Override + public List<Runnable> getTasks() throws MetaException { + int abortedThreshold = HiveConf.getIntVar(conf, + HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD); + long abortedTimeThreshold = HiveConf + .getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, + TimeUnit.MILLISECONDS); + List<AcidTxnInfo> readyToCleanAborts = txnHandler.findReadyToCleanForAborts(abortedTimeThreshold, abortedThreshold); + + if (!readyToCleanAborts.isEmpty()) { + return readyToCleanAborts.stream().map(ci -> ThrowingRunnable.unchecked(() -> + clean(ci, ci.txnId > 0 ? ci.txnId : Long.MAX_VALUE, metricsEnabled))) + .collect(Collectors.toList()); + } + return Collections.emptyList(); + } + + private void clean(AcidTxnInfo info, long minOpenTxn, boolean metricsEnabled) throws MetaException { + LOG.info("Starting cleaning for {}", info); + PerfLogger perfLogger = PerfLogger.getPerfLogger(false); + String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_"; + try { + if (metricsEnabled) { + perfLogger.perfLogBegin(AbortedTxnCleaner.class.getName(), cleanerMetric); + } + Table t; + Partition p = null; + t = metadataCache.computeIfAbsent(info.getFullTableName(), () -> resolveTable(info.dbname, info.tableName)); + if (isNull(t)) { + // The table was dropped before we got around to cleaning it. + LOG.info("Unable to find table {}, assuming it was dropped.", info.getFullTableName()); + txnHandler.markCleanedForAborts(info); + return; + } + if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) { + // The table was marked no clean up true. + LOG.info("Skipping table {} clean up, as NO_CLEANUP set to true", info.getFullTableName()); + return; + } + if (!isNull(info.partName)) { + p = resolvePartition(info.dbname, info.tableName, info.partName); + if (isNull(p)) { + // The partition was dropped before we got around to cleaning it. + LOG.info("Unable to find partition {}, assuming it was dropped.", + info.getFullPartitionName()); + txnHandler.markCleanedForAborts(info); + return; + } + if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) { + // The partition was marked no clean up true. + LOG.info("Skipping partition {} clean up, as NO_CLEANUP set to true", info.getFullPartitionName()); + return; + } + } + + String location = CompactorUtil.resolveStorageDescriptor(t,p).getLocation(); + info.runAs = TxnUtils.findUserToRunAs(location, t, conf); + abortCleanUsingAcidDir(info, location, minOpenTxn); + + } catch (Exception e) { + LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", info, + e.getMessage()); + throw new MetaException(e.getMessage()); + } finally { + if (metricsEnabled) { + perfLogger.perfLogEnd(AbortedTxnCleaner.class.getName(), cleanerMetric); Review Comment: I think it does not worth doing it 1. It's a metrics, not a critical component or code part 2. adding a retry (especially with back-off) would possibly corrupt the metrics value, as the perfLogEnd call would not be in sync with the complete time of the operation it measures Issue Time Tracking ------------------- Worklog Id: (was: 851952) Time Spent: 3h 10m (was: 3h) > Implement a separate handler to handle aborted transaction cleanup > ------------------------------------------------------------------ > > Key: HIVE-27020 > URL: https://issues.apache.org/jira/browse/HIVE-27020 > Project: Hive > Issue Type: Sub-task > Reporter: Sourabh Badhya > Assignee: Sourabh Badhya > Priority: Major > Labels: pull-request-available > Time Spent: 3h 10m > Remaining Estimate: 0h > > As described in the parent task, once the cleaner is separated into different > entities, implement a separate handler which can create requests for aborted > transactions cleanup. This would move the aborted transaction cleanup > exclusively to the cleaner. -- This message was sent by Atlassian Jira (v8.20.10#820010)