[ 
https://issues.apache.org/jira/browse/HIVE-22977?focusedWorklogId=837445&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-837445
 ]

ASF GitHub Bot logged work on HIVE-22977:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Jan/23 09:42
            Start Date: 06/Jan/23 09:42
    Worklog Time Spent: 10m 
      Work Description: veghlaci05 commented on code in PR #3801:
URL: https://github.com/apache/hive/pull/3801#discussion_r1063258058


##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorChain.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Runs different compactions based on the order provided in the list.
+ * Mainly used for fall back mechanism for Merge compaction.
+ */
+final class CompactorChain implements Compactor {
+
+  private final List<Compactor> compactors;
+
+  CompactorChain(List<Compactor> compactors) {
+    this.compactors = compactors;

Review Comment:
   Please either add null check here, or instantiate the list above, and use 
addAll to prevent NPE in run method.



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java:
##########
@@ -289,6 +289,23 @@ protected void insertMmTestDataPartitioned(String tblName) 
throws Exception {
           + "('5',4, 'today'),('6',2, 'today'),('6',3, 'today'),('6',4, 
'today')", driver);
     }
 
+    protected void insertTestData(String tblName, boolean isPartitioned) 
throws Exception {

Review Comment:
   I like this idea, what about making `insertTestDataPartitioned` and 
`insertTestData` private, and adjust the current calls? However feel free to 
ignore if there are numerous usages.



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MergeCompactor.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+import java.io.IOException;
+
+final class MergeCompactor extends QueryCompactor {
+
+  @Override
+  public boolean run(HiveConf hiveConf, Table table, Partition partition, 
StorageDescriptor storageDescriptor,
+                  ValidWriteIdList writeIds, CompactionInfo compactionInfo, 
AcidDirectory dir) throws IOException, HiveException, InterruptedException {
+    if (Util.isMergeCompaction(hiveConf, dir, writeIds, storageDescriptor)) {

Review Comment:
   If this is the only usage, you could move it to this class as a private 
method. Please apply this to every other method as well, which is used only in 
MergeCompactor.



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -2951,4 +2952,170 @@ public void testStatsAfterCompactionPartTbl(boolean 
isQueryBased, boolean isAuto
     Assert.assertEquals("The number of rows is differing from the expected", 
"2", parameters.get("numRows"));
     executeStatementOnDriver("drop table if exists " + tblName, driver);
   }
+
+  @Test
+  public void testMajorCompactionWithMergeNotPartitionedWithoutBuckets() 
throws Exception {
+    testCompactionWithMerge(CompactionType.MAJOR, false, false, null, 
Collections.singletonList("bucket_00000"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("base_0000003_v0000007"), true, true);
+  }
+
+  @Test
+  public void testMajorCompactionWithMergePartitionedWithoutBuckets() throws 
Exception {
+    testCompactionWithMerge(CompactionType.MAJOR, true, false, "ds=today", 
Collections.singletonList("bucket_00000"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("base_0000003_v0000007"), true, true);
+  }
+
+  @Test
+  public void testMajorCompactionWithMergeNotPartitionedWithBuckets() throws 
Exception {
+    testCompactionWithMerge(CompactionType.MAJOR, false, true, null, 
Arrays.asList("bucket_00000", "bucket_00001"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("base_0000003_v0000007"), true, true);
+  }
+
+  @Test
+  public void testMajorCompactionWithMergerPartitionedWithBuckets() throws 
Exception {
+    testCompactionWithMerge(CompactionType.MAJOR, true, true, "ds=today", 
Arrays.asList("bucket_00000", "bucket_00001"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("base_0000003_v0000007"), true, true);
+  }
+
+  @Test
+  public void testMinorCompactionWithMergeNotPartitionedWithoutBuckets() 
throws Exception {
+    testCompactionWithMerge(CompactionType.MINOR, false, false, null,
+            Collections.singletonList("bucket_00000"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("delta_0000001_0000003_v0000007"), true, 
true);
+  }
+
+  @Test
+  public void testMinorCompactionWithMergePartitionedWithoutBuckets() throws 
Exception {
+    testCompactionWithMerge(CompactionType.MINOR, true, false, "ds=today",
+            Collections.singletonList("bucket_00000"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("delta_0000001_0000003_v0000007"), true, 
true);
+  }
+
+  @Test
+  public void testMinorCompactionWithMergeNotPartitionedWithBuckets() throws 
Exception {
+    testCompactionWithMerge(CompactionType.MINOR, false, true, null,
+            Arrays.asList("bucket_00000", "bucket_00001"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("delta_0000001_0000003_v0000007"), true, 
true);
+  }
+
+  @Test
+  public void testMinorCompactionWithMergePartitionedWithBuckets() throws 
Exception {
+    testCompactionWithMerge(CompactionType.MINOR, true, true, "ds=today",
+            Arrays.asList("bucket_00000", "bucket_00001"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("delta_0000001_0000003_v0000007"), true, 
true);
+  }
+
+  @Test
+  public void testMajorCompactionAfterMinorWithMerge() throws Exception {
+    testCompactionWithMerge(CompactionType.MINOR, true, true, "ds=today",
+            Arrays.asList("bucket_00000", "bucket_00001"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("delta_0000001_0000003_v0000007"),true, 
false);
+    testCompactionWithMerge(CompactionType.MAJOR, true, true, "ds=today",
+            Arrays.asList("bucket_00000", "bucket_00001"),
+            Arrays.asList("delta_0000001_0000003_v0000007", 
"delta_0000004_0000004_0000", "delta_0000005_0000005_0000",
+                    "delta_0000006_0000006_0000"), 
Collections.singletonList("base_0000006_v0000014"), false, true);
+  }
+
+  @Test
+  public void testMinorCompactionAfterMajorWithMerge() throws Exception {
+    testCompactionWithMerge(CompactionType.MAJOR, false, false, null,
+            Collections.singletonList("bucket_00000"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("base_0000003_v0000007"), true, false);
+    testCompactionWithMerge(CompactionType.MINOR, false, false, null,
+            Collections.singletonList("bucket_00000"),
+            Arrays.asList("delta_0000004_0000004_0000", 
"delta_0000005_0000005_0000", "delta_0000006_0000006_0000"),
+            Collections.singletonList("delta_0000001_0000006_v0000014"), 
false, true);
+  }
+
+  @Test
+  public void testMultipleMajorCompactionWithMerge() throws Exception {
+    testCompactionWithMerge(CompactionType.MAJOR, true, true, "ds=today",
+            Arrays.asList("bucket_00000", "bucket_00001"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("base_0000003_v0000007"), true, false);
+    testCompactionWithMerge(CompactionType.MAJOR, true, true, "ds=today",
+            Arrays.asList("bucket_00000", "bucket_00001"),
+            Arrays.asList("delta_0000004_0000004_0000", 
"delta_0000005_0000005_0000", "delta_0000006_0000006_0000"),
+            Collections.singletonList("base_0000006_v0000014"), false, true);
+  }
+
+  @Test
+  public void testMultipleMinorCompactionWithMerge() throws Exception {
+    testCompactionWithMerge(CompactionType.MINOR, false, false, null,
+            Collections.singletonList("bucket_00000"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("delta_0000001_0000003_v0000007"), true, 
false);
+    testCompactionWithMerge(CompactionType.MINOR, false, false, null,
+            Collections.singletonList("bucket_00000"),
+            Arrays.asList("delta_0000001_0000003_v0000007", 
"delta_0000004_0000004_0000", "delta_0000005_0000005_0000",
+                    "delta_0000006_0000006_0000"),
+            Collections.singletonList("delta_0000001_0000006_v0000014"), 
false, true);
+  }
+
+  private void testCompactionWithMerge(CompactionType compactionType, boolean 
isPartitioned, boolean isBucketed,
+                                       String partitionName, List<String> 
bucketName, List<String> deltaDirNames, List<String> compactDirNames,
+                                       boolean createTable, boolean dropTable) 
throws Exception {
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_MERGE_COMPACTION_ENABLED, true);
+    String dbName = "default";
+    String tableName = "testCompaction";
+    // Create test table
+    TestDataProvider dataProvider = new TestDataProvider();
+    if (createTable) {
+      dataProvider.createFullAcidTable(tableName, isPartitioned, isBucketed);
+    }
+    // Find the location of the table
+    IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+    Table table = metaStoreClient.getTable(dbName, tableName);
+    FileSystem fs = FileSystem.get(conf);
+    // Insert test data into test table
+    dataProvider.insertMmTestData(tableName, isPartitioned);

Review Comment:
   You are creating a full acid table, but inserting MM test data. Is this 
intended?



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/OrcFileMerger.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.ql.io.orc.Writer;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Class to support fast merging of ORC files.
+ */
+public class OrcFileMerger {
+
+  private final Configuration conf;
+  private static final Logger LOG = 
LoggerFactory.getLogger(OrcFileMerger.class);
+
+  public OrcFileMerger(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Merge orc files into a single file
+   * @param files list of orc file paths to be merged
+   * @param outPath the path of output orc file
+   * @throws IOException error happened during file operations
+   */
+  public void mergeFiles(List<Reader> readers, Path outPath) throws 
IOException {
+    Writer writer = null;
+    try {
+      for (Reader reader : readers) {
+        if (writer == null) {
+          writer = setupWriter(reader, outPath);
+        }
+        VectorizedRowBatch batch = reader.getSchema().createRowBatchV2();
+        RecordReader rows = reader.rows();
+        while (rows.nextBatch(batch)) {
+          if (batch != null) {
+            writer.addRowBatch(batch);
+          }
+        }
+        rows.close();
+      }
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+    }

Review Comment:
   Why not use try-with-resources for writer?



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java:
##########
@@ -281,5 +283,146 @@ static void overrideConfProps(HiveConf conf, 
CompactionInfo ci, Map<String, Stri
                 conf.set(property, entry.getValue());
               });
     }
+
+    /**
+     * Returns whether merge compaction must be enabled or not.
+     * @param conf Hive configuration
+     * @param directory the directory to be scanned
+     * @param validWriteIdList list of valid write IDs
+     * @param storageDescriptor storage descriptor of the underlying table
+     * @return true, if merge compaction must be enabled
+     */
+    static boolean isMergeCompaction(HiveConf conf, AcidDirectory directory,
+                                     ValidWriteIdList validWriteIdList,
+                                     StorageDescriptor storageDescriptor) {
+      return conf.getBoolVar(HiveConf.ConfVars.HIVE_MERGE_COMPACTION_ENABLED)
+              && !hasDeleteOrAbortedDirectories(directory, validWriteIdList)

Review Comment:
   Please switch the last to condition. Due to the short circuit evaluation, 
the expensive hasDeleteOrAbortedDirectories will be called only if the other 
two are true.



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java:
##########
@@ -281,5 +283,146 @@ static void overrideConfProps(HiveConf conf, 
CompactionInfo ci, Map<String, Stri
                 conf.set(property, entry.getValue());
               });
     }
+
+    /**
+     * Returns whether merge compaction must be enabled or not.
+     * @param conf Hive configuration
+     * @param directory the directory to be scanned
+     * @param validWriteIdList list of valid write IDs
+     * @param storageDescriptor storage descriptor of the underlying table
+     * @return true, if merge compaction must be enabled
+     */
+    static boolean isMergeCompaction(HiveConf conf, AcidDirectory directory,
+                                     ValidWriteIdList validWriteIdList,
+                                     StorageDescriptor storageDescriptor) {
+      return conf.getBoolVar(HiveConf.ConfVars.HIVE_MERGE_COMPACTION_ENABLED)
+              && !hasDeleteOrAbortedDirectories(directory, validWriteIdList)
+              && 
storageDescriptor.getOutputFormat().equalsIgnoreCase("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat");
+    }
+
+    /**
+     * Scan a directory for delete deltas or aborted directories.
+     * @param directory the directory to be scanned
+     * @param validWriteIdList list of valid write IDs
+     * @return true, if delete or aborted directory found
+     */
+    static boolean hasDeleteOrAbortedDirectories(AcidDirectory directory, 
ValidWriteIdList validWriteIdList) {
+      if (!directory.getCurrentDirectories().isEmpty()) {
+        final long minWriteId = validWriteIdList.getMinOpenWriteId() == null ? 
1 : validWriteIdList.getMinOpenWriteId();
+        final long maxWriteId = validWriteIdList.getHighWatermark();
+        return directory.getCurrentDirectories().stream()
+                .filter(AcidUtils.ParsedDeltaLight::isDeleteDelta)
+                .filter(delta -> delta.getMinWriteId() >= minWriteId)
+                .anyMatch(delta -> delta.getMaxWriteId() <= maxWriteId) || 
!directory.getAbortedDirectories().isEmpty();
+      }
+      return true;
+    }
+
+    /**
+     * Collect the list of all bucket file paths, which belong to the same 
bucket Id. This method scans all the base
+     * and delta dirs.
+     * @param conf hive configuration, must be not null
+     * @param dir the root directory of delta dirs
+     * @param includeBaseDir true, if the base directory should be scanned
+     * @param isMm
+     * @return map of bucket ID -> bucket files
+     * @throws IOException an error happened during the reading of the 
directory/bucket file
+     */
+    private static Map<Integer, List<Reader>> 
matchBucketIdToBucketFiles(HiveConf conf, AcidDirectory dir,
+                                                                       boolean 
includeBaseDir, boolean isMm) throws IOException {
+      Map<Integer, List<Reader>> result = new HashMap<>();
+      if (includeBaseDir && dir.getBaseDirectory() != null) {
+        getBucketFiles(conf, dir.getBaseDirectory(), isMm, result);
+      }
+      for (AcidUtils.ParsedDelta deltaDir : dir.getCurrentDirectories()) {
+        Path deltaDirPath = deltaDir.getPath();
+        getBucketFiles(conf, deltaDirPath, isMm, result);
+      }
+      return result;
+    }
+
+    /**
+     * Collect the list of all bucket file paths, which belong to the same 
bucket Id. This method checks only one
+     * directory.
+     * @param conf hive configuration, must be not null
+     * @param dirPath the directory to be scanned.
+     * @param isMm collect bucket files fron insert only directories
+     * @param bucketIdToBucketFilePath the result of the scan
+     * @throws IOException an error happened during the reading of the 
directory/bucket file
+     */
+    private static void getBucketFiles(HiveConf conf, Path dirPath, boolean 
isMm, Map<Integer, List<Reader>> bucketIdToBucketFilePath) throws IOException {
+      FileSystem fs = dirPath.getFileSystem(conf);
+      FileStatus[] fileStatuses =
+              fs.listStatus(dirPath, isMm ? AcidUtils.originalBucketFilter : 
AcidUtils.bucketFileFilter);
+      for (FileStatus f : fileStatuses) {
+        final Path fPath = f.getPath();
+        Matcher matcher = isMm ? AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN
+                .matcher(fPath.getName()) : 
AcidUtils.BUCKET_PATTERN.matcher(fPath.getName());
+        if (!matcher.find()) {
+          String errorMessage = String
+                  .format("Found a bucket file matching the bucket pattern! %s 
Matcher=%s", fPath.toString(),

Review Comment:
   It should be _**Could not** found a bucket file...._, isn't it?



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java:
##########
@@ -281,5 +283,146 @@ static void overrideConfProps(HiveConf conf, 
CompactionInfo ci, Map<String, Stri
                 conf.set(property, entry.getValue());
               });
     }
+
+    /**
+     * Returns whether merge compaction must be enabled or not.
+     * @param conf Hive configuration
+     * @param directory the directory to be scanned
+     * @param validWriteIdList list of valid write IDs
+     * @param storageDescriptor storage descriptor of the underlying table
+     * @return true, if merge compaction must be enabled
+     */
+    static boolean isMergeCompaction(HiveConf conf, AcidDirectory directory,
+                                     ValidWriteIdList validWriteIdList,
+                                     StorageDescriptor storageDescriptor) {
+      return conf.getBoolVar(HiveConf.ConfVars.HIVE_MERGE_COMPACTION_ENABLED)
+              && !hasDeleteOrAbortedDirectories(directory, validWriteIdList)
+              && 
storageDescriptor.getOutputFormat().equalsIgnoreCase("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat");
+    }
+
+    /**
+     * Scan a directory for delete deltas or aborted directories.
+     * @param directory the directory to be scanned
+     * @param validWriteIdList list of valid write IDs
+     * @return true, if delete or aborted directory found
+     */
+    static boolean hasDeleteOrAbortedDirectories(AcidDirectory directory, 
ValidWriteIdList validWriteIdList) {
+      if (!directory.getCurrentDirectories().isEmpty()) {
+        final long minWriteId = validWriteIdList.getMinOpenWriteId() == null ? 
1 : validWriteIdList.getMinOpenWriteId();
+        final long maxWriteId = validWriteIdList.getHighWatermark();
+        return directory.getCurrentDirectories().stream()
+                .filter(AcidUtils.ParsedDeltaLight::isDeleteDelta)
+                .filter(delta -> delta.getMinWriteId() >= minWriteId)
+                .anyMatch(delta -> delta.getMaxWriteId() <= maxWriteId) || 
!directory.getAbortedDirectories().isEmpty();
+      }
+      return true;
+    }
+
+    /**
+     * Collect the list of all bucket file paths, which belong to the same 
bucket Id. This method scans all the base
+     * and delta dirs.
+     * @param conf hive configuration, must be not null
+     * @param dir the root directory of delta dirs
+     * @param includeBaseDir true, if the base directory should be scanned
+     * @param isMm
+     * @return map of bucket ID -> bucket files
+     * @throws IOException an error happened during the reading of the 
directory/bucket file
+     */
+    private static Map<Integer, List<Reader>> 
matchBucketIdToBucketFiles(HiveConf conf, AcidDirectory dir,
+                                                                       boolean 
includeBaseDir, boolean isMm) throws IOException {
+      Map<Integer, List<Reader>> result = new HashMap<>();
+      if (includeBaseDir && dir.getBaseDirectory() != null) {
+        getBucketFiles(conf, dir.getBaseDirectory(), isMm, result);
+      }
+      for (AcidUtils.ParsedDelta deltaDir : dir.getCurrentDirectories()) {
+        Path deltaDirPath = deltaDir.getPath();
+        getBucketFiles(conf, deltaDirPath, isMm, result);
+      }
+      return result;
+    }
+
+    /**
+     * Collect the list of all bucket file paths, which belong to the same 
bucket Id. This method checks only one
+     * directory.
+     * @param conf hive configuration, must be not null
+     * @param dirPath the directory to be scanned.
+     * @param isMm collect bucket files fron insert only directories
+     * @param bucketIdToBucketFilePath the result of the scan
+     * @throws IOException an error happened during the reading of the 
directory/bucket file
+     */
+    private static void getBucketFiles(HiveConf conf, Path dirPath, boolean 
isMm, Map<Integer, List<Reader>> bucketIdToBucketFilePath) throws IOException {
+      FileSystem fs = dirPath.getFileSystem(conf);
+      FileStatus[] fileStatuses =
+              fs.listStatus(dirPath, isMm ? AcidUtils.originalBucketFilter : 
AcidUtils.bucketFileFilter);
+      for (FileStatus f : fileStatuses) {
+        final Path fPath = f.getPath();
+        Matcher matcher = isMm ? AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN
+                .matcher(fPath.getName()) : 
AcidUtils.BUCKET_PATTERN.matcher(fPath.getName());
+        if (!matcher.find()) {
+          String errorMessage = String
+                  .format("Found a bucket file matching the bucket pattern! %s 
Matcher=%s", fPath.toString(),
+                          matcher.toString());
+          LOG.error(errorMessage);
+          throw new IllegalArgumentException(errorMessage);
+        }
+        int bucketNum = matcher.groupCount() > 0 ? 
Integer.parseInt(matcher.group(1)) : Integer.parseInt(matcher.group());
+        bucketIdToBucketFilePath.computeIfAbsent(bucketNum, ArrayList::new);
+        Reader reader = OrcFile.createReader(fs, fPath);
+        bucketIdToBucketFilePath.computeIfPresent(bucketNum, (k, v) -> 
v).add(reader);
+      }
+    }
+
+    /**
+     * Generate output path for compaction. This can be used to generate delta 
or base directories.
+     * @param conf hive configuration, must be non-null
+     * @param writeIds list of valid write IDs
+     * @param isBaseDir if base directory path should be generated
+     * @param sd the resolved storadge descriptor
+     * @return output path, always non-null
+     */
+    static Path getCompactionOutputDirPath(HiveConf conf, ValidWriteIdList 
writeIds, boolean isBaseDir,
+                                           StorageDescriptor sd) {
+      long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : 
writeIds.getMinOpenWriteId();
+      long highWatermark = writeIds.getHighWatermark();
+      long compactorTxnId = Compactor.getCompactorTxnId(conf);
+      AcidOutputFormat.Options options = new 
AcidOutputFormat.Options(conf).writingBase(isBaseDir)
+              
.writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId)
+              
.maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId);
+      return AcidUtils.baseOrDeltaSubdirPath(new Path(sd.getLocation()), 
options);
+    }
+
+    /**
+     * Merge ORC files from base/delta directories. If the directories 
contains multiple buckets, the result will also
+     * contain the same amount.
+     * @param conf hive configuration
+     * @param includeBaseDir if base directory should be scanned for orc files
+     * @param dir the root directory of the table/partition
+     * @param outputDirPath the result directory path
+     * @param isMm merge orc files from insert only tables
+     * @throws IOException error occurred during file operation
+     */
+    static boolean mergeOrcFiles(HiveConf conf, boolean includeBaseDir, 
AcidDirectory dir,
+                              Path outputDirPath, boolean isMm) throws 
IOException {
+      Map<Integer, List<Reader>> bucketIdToBucketFiles = 
matchBucketIdToBucketFiles(conf, dir, includeBaseDir, isMm);
+      OrcFileMerger fileMerger = new OrcFileMerger(conf);
+      for (Map.Entry<Integer, List<Reader>> e : 
bucketIdToBucketFiles.entrySet()) {
+        fileMerger.checkCompatibility(e.getValue());
+      }
+      boolean isCompatible = true;
+      for (Map.Entry<Integer, List<Reader>> e : 
bucketIdToBucketFiles.entrySet()) {
+        isCompatible &= fileMerger.checkCompatibility(e.getValue());
+      }

Review Comment:
   Why are you iterating and calling checkCompatibility twice?



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorChain.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Runs different compactions based on the order provided in the list.
+ * Mainly used for fall back mechanism for Merge compaction.
+ */
+final class CompactorChain implements Compactor {
+
+  private final List<Compactor> compactors;
+
+  CompactorChain(List<Compactor> compactors) {
+    this.compactors = compactors;
+  }
+
+  @Override
+  public boolean run(HiveConf hiveConf, Table table, Partition partition, 
StorageDescriptor storageDescriptor, ValidWriteIdList writeIds, CompactionInfo 
compactionInfo, AcidDirectory dir) throws IOException, HiveException, 
InterruptedException {
+    int i = 0;
+    while(i < compactors.size() && !compactors.get(i).run(hiveConf, table, 
partition, storageDescriptor, writeIds, compactionInfo, dir)) {

Review Comment:
   Since we run the first one for sure you can use do-while instead



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Compactor.java:
##########
@@ -59,7 +59,7 @@ static long getCompactorTxnId(Configuration jobConf) {
    * @param dir provides ACID directory layout information
    * @throws IOException compaction cannot be finished.
    */
-  void run(HiveConf hiveConf, Table table, Partition partition, 
StorageDescriptor storageDescriptor,
+  boolean run(HiveConf hiveConf, Table table, Partition partition, 
StorageDescriptor storageDescriptor,

Review Comment:
   Please describe the meaning of the return value in the javadoc



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -2951,4 +2952,170 @@ public void testStatsAfterCompactionPartTbl(boolean 
isQueryBased, boolean isAuto
     Assert.assertEquals("The number of rows is differing from the expected", 
"2", parameters.get("numRows"));
     executeStatementOnDriver("drop table if exists " + tblName, driver);
   }
+
+  @Test
+  public void testMajorCompactionWithMergeNotPartitionedWithoutBuckets() 
throws Exception {
+    testCompactionWithMerge(CompactionType.MAJOR, false, false, null, 
Collections.singletonList("bucket_00000"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("base_0000003_v0000007"), true, true);
+  }
+
+  @Test
+  public void testMajorCompactionWithMergePartitionedWithoutBuckets() throws 
Exception {
+    testCompactionWithMerge(CompactionType.MAJOR, true, false, "ds=today", 
Collections.singletonList("bucket_00000"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("base_0000003_v0000007"), true, true);
+  }
+
+  @Test
+  public void testMajorCompactionWithMergeNotPartitionedWithBuckets() throws 
Exception {
+    testCompactionWithMerge(CompactionType.MAJOR, false, true, null, 
Arrays.asList("bucket_00000", "bucket_00001"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("base_0000003_v0000007"), true, true);
+  }
+
+  @Test
+  public void testMajorCompactionWithMergerPartitionedWithBuckets() throws 
Exception {
+    testCompactionWithMerge(CompactionType.MAJOR, true, true, "ds=today", 
Arrays.asList("bucket_00000", "bucket_00001"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("base_0000003_v0000007"), true, true);
+  }
+
+  @Test
+  public void testMinorCompactionWithMergeNotPartitionedWithoutBuckets() 
throws Exception {
+    testCompactionWithMerge(CompactionType.MINOR, false, false, null,
+            Collections.singletonList("bucket_00000"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("delta_0000001_0000003_v0000007"), true, 
true);
+  }
+
+  @Test
+  public void testMinorCompactionWithMergePartitionedWithoutBuckets() throws 
Exception {
+    testCompactionWithMerge(CompactionType.MINOR, true, false, "ds=today",
+            Collections.singletonList("bucket_00000"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("delta_0000001_0000003_v0000007"), true, 
true);
+  }
+
+  @Test
+  public void testMinorCompactionWithMergeNotPartitionedWithBuckets() throws 
Exception {
+    testCompactionWithMerge(CompactionType.MINOR, false, true, null,
+            Arrays.asList("bucket_00000", "bucket_00001"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("delta_0000001_0000003_v0000007"), true, 
true);
+  }
+
+  @Test
+  public void testMinorCompactionWithMergePartitionedWithBuckets() throws 
Exception {
+    testCompactionWithMerge(CompactionType.MINOR, true, true, "ds=today",
+            Arrays.asList("bucket_00000", "bucket_00001"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("delta_0000001_0000003_v0000007"), true, 
true);
+  }
+
+  @Test
+  public void testMajorCompactionAfterMinorWithMerge() throws Exception {
+    testCompactionWithMerge(CompactionType.MINOR, true, true, "ds=today",
+            Arrays.asList("bucket_00000", "bucket_00001"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("delta_0000001_0000003_v0000007"),true, 
false);
+    testCompactionWithMerge(CompactionType.MAJOR, true, true, "ds=today",
+            Arrays.asList("bucket_00000", "bucket_00001"),
+            Arrays.asList("delta_0000001_0000003_v0000007", 
"delta_0000004_0000004_0000", "delta_0000005_0000005_0000",
+                    "delta_0000006_0000006_0000"), 
Collections.singletonList("base_0000006_v0000014"), false, true);
+  }
+
+  @Test
+  public void testMinorCompactionAfterMajorWithMerge() throws Exception {
+    testCompactionWithMerge(CompactionType.MAJOR, false, false, null,
+            Collections.singletonList("bucket_00000"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("base_0000003_v0000007"), true, false);
+    testCompactionWithMerge(CompactionType.MINOR, false, false, null,
+            Collections.singletonList("bucket_00000"),
+            Arrays.asList("delta_0000004_0000004_0000", 
"delta_0000005_0000005_0000", "delta_0000006_0000006_0000"),
+            Collections.singletonList("delta_0000001_0000006_v0000014"), 
false, true);
+  }
+
+  @Test
+  public void testMultipleMajorCompactionWithMerge() throws Exception {
+    testCompactionWithMerge(CompactionType.MAJOR, true, true, "ds=today",
+            Arrays.asList("bucket_00000", "bucket_00001"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("base_0000003_v0000007"), true, false);
+    testCompactionWithMerge(CompactionType.MAJOR, true, true, "ds=today",
+            Arrays.asList("bucket_00000", "bucket_00001"),
+            Arrays.asList("delta_0000004_0000004_0000", 
"delta_0000005_0000005_0000", "delta_0000006_0000006_0000"),
+            Collections.singletonList("base_0000006_v0000014"), false, true);
+  }
+
+  @Test
+  public void testMultipleMinorCompactionWithMerge() throws Exception {
+    testCompactionWithMerge(CompactionType.MINOR, false, false, null,
+            Collections.singletonList("bucket_00000"),
+            Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000003_0000003_0000"),
+            Collections.singletonList("delta_0000001_0000003_v0000007"), true, 
false);
+    testCompactionWithMerge(CompactionType.MINOR, false, false, null,
+            Collections.singletonList("bucket_00000"),
+            Arrays.asList("delta_0000001_0000003_v0000007", 
"delta_0000004_0000004_0000", "delta_0000005_0000005_0000",
+                    "delta_0000006_0000006_0000"),
+            Collections.singletonList("delta_0000001_0000006_v0000014"), 
false, true);
+  }
+
+  private void testCompactionWithMerge(CompactionType compactionType, boolean 
isPartitioned, boolean isBucketed,

Review Comment:
   You should not only check the result of the compaction, but check if 
MergeCompactor was used and not others. Since the result is the same you cannot 
be sure that it was done using MergeCompactor. Also, you should add some 
negative test cases where merge MergeCompactor will return false, and check if 
the fallback compactors were used in those cases. You will need to mock some 
classes to achieve this.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 837445)
    Time Spent: 1h 20m  (was: 1h 10m)

> Merge delta files instead of running a query in major/minor compaction
> ----------------------------------------------------------------------
>
>                 Key: HIVE-22977
>                 URL: https://issues.apache.org/jira/browse/HIVE-22977
>             Project: Hive
>          Issue Type: Improvement
>            Reporter: László Pintér
>            Assignee: Sourabh Badhya
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HIVE-22977.01.patch, HIVE-22977.02.patch
>
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> [Compaction Optimiziation]
> We should analyse the possibility to move a delta file instead of running a 
> major/minor compaction query.
> Please consider the following use cases:
>  - full acid table but only insert queries were run. This means that no 
> delete delta directories were created. Is it possible to merge the delta 
> directory contents without running a compaction query?
>  - full acid table, initiating queries through the streaming API. If there 
> are no abort transactions during the streaming, is it possible to merge the 
> delta directory contents without running a compaction query?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to