[ https://issues.apache.org/jira/browse/HIVE-22977?focusedWorklogId=840384&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-840384 ]
ASF GitHub Bot logged work on HIVE-22977: ----------------------------------------- Author: ASF GitHub Bot Created on: 19/Jan/23 17:39 Start Date: 19/Jan/23 17:39 Worklog Time Spent: 10m Work Description: deniskuzZ commented on code in PR #3801: URL: https://github.com/apache/hive/pull/3801#discussion_r1081607812 ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MergeCompactor.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.ql.io.AcidDirectory; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; +import org.apache.hadoop.hive.ql.io.orc.Reader; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; + +final class MergeCompactor extends QueryCompactor { + + private static final Logger LOG = LoggerFactory.getLogger(MergeCompactor.class.getName()); + + @Override + public boolean run(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, + ValidWriteIdList writeIds, CompactionInfo compactionInfo, AcidDirectory dir) throws IOException, HiveException, InterruptedException { + if (isMergeCompaction(hiveConf, dir, writeIds, storageDescriptor)) { + // Only inserts happened, it is much more performant to merge the files than running a query + Path outputDirPath = getCompactionOutputDirPath(hiveConf, writeIds, + compactionInfo.isMajorCompaction(), storageDescriptor); + try { + return mergeOrcFiles(hiveConf, compactionInfo.isMajorCompaction(), + dir, outputDirPath, AcidUtils.isInsertOnlyTable(table.getParameters())); + } catch (Throwable t) { + // Error handling, just delete the output directory, + // and fall back to query based compaction. + FileSystem fs = outputDirPath.getFileSystem(hiveConf); + if (fs.exists(outputDirPath)) { + fs.delete(outputDirPath, true); + } + return false; + } + } else { + return false; + } + } + + /** + * Returns whether merge compaction must be enabled or not. + * @param conf Hive configuration + * @param directory the directory to be scanned + * @param validWriteIdList list of valid write IDs + * @param storageDescriptor storage descriptor of the underlying table + * @return true, if merge compaction must be enabled + */ + private boolean isMergeCompaction(HiveConf conf, AcidDirectory directory, + ValidWriteIdList validWriteIdList, + StorageDescriptor storageDescriptor) { + return conf.getBoolVar(HiveConf.ConfVars.HIVE_MERGE_COMPACTION_ENABLED) + && storageDescriptor.getOutputFormat().equalsIgnoreCase("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat") + && !hasDeleteOrAbortedDirectories(directory, validWriteIdList); + } + + /** + * Scan a directory for delete deltas or aborted directories. + * @param directory the directory to be scanned + * @param validWriteIdList list of valid write IDs + * @return true, if delete or aborted directory found + */ + private boolean hasDeleteOrAbortedDirectories(AcidDirectory directory, ValidWriteIdList validWriteIdList) { + if (!directory.getCurrentDirectories().isEmpty()) { + final long minWriteId = validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId(); + final long maxWriteId = validWriteIdList.getHighWatermark(); + return directory.getCurrentDirectories().stream() + .filter(AcidUtils.ParsedDeltaLight::isDeleteDelta) + .filter(delta -> delta.getMinWriteId() >= minWriteId) + .anyMatch(delta -> delta.getMaxWriteId() <= maxWriteId) || !directory.getAbortedDirectories().isEmpty(); + } + return true; + } + + /** + * Collect the list of all bucket file paths, which belong to the same bucket Id. This method scans all the base + * and delta dirs. + * @param conf hive configuration, must be not null + * @param dir the root directory of delta dirs + * @param includeBaseDir true, if the base directory should be scanned + * @param isMm + * @return map of bucket ID -> bucket files + * @throws IOException an error happened during the reading of the directory/bucket file + */ + private Map<Integer, List<Reader>> matchBucketIdToBucketFiles(HiveConf conf, AcidDirectory dir, + boolean includeBaseDir, boolean isMm) throws IOException { + Map<Integer, List<Reader>> result = new HashMap<>(); + if (includeBaseDir && dir.getBaseDirectory() != null) { + getBucketFiles(conf, dir.getBaseDirectory(), isMm, result); + } + for (AcidUtils.ParsedDelta deltaDir : dir.getCurrentDirectories()) { + Path deltaDirPath = deltaDir.getPath(); + getBucketFiles(conf, deltaDirPath, isMm, result); Review Comment: please refactor the method. getBucketFiles, it supposed to return the map ```` result.putAll(getBucketFiles(conf, deltaDirPath, isMm) ```` Issue Time Tracking ------------------- Worklog Id: (was: 840384) Time Spent: 6h 20m (was: 6h 10m) > Merge delta files instead of running a query in major/minor compaction > ---------------------------------------------------------------------- > > Key: HIVE-22977 > URL: https://issues.apache.org/jira/browse/HIVE-22977 > Project: Hive > Issue Type: Improvement > Reporter: László Pintér > Assignee: Sourabh Badhya > Priority: Major > Labels: pull-request-available > Attachments: HIVE-22977.01.patch, HIVE-22977.02.patch > > Time Spent: 6h 20m > Remaining Estimate: 0h > > [Compaction Optimiziation] > We should analyse the possibility to move a delta file instead of running a > major/minor compaction query. > Please consider the following use cases: > - full acid table but only insert queries were run. This means that no > delete delta directories were created. Is it possible to merge the delta > directory contents without running a compaction query? > - full acid table, initiating queries through the streaming API. If there > are no abort transactions during the streaming, is it possible to merge the > delta directory contents without running a compaction query? -- This message was sent by Atlassian Jira (v8.20.10#820010)