This is an automated email from the ASF dual-hosted git repository.

klcopp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new c6ebd72  HIVE-23703: Major QB compaction with multiple 
FileSinkOperators results in data loss and one original file (Karen Coppage 
reviewed by Marta Kuczora))
c6ebd72 is described below

commit c6ebd72a54c24671532e5024e0f430bb677e9bbe
Author: Karen Coppage <karenlcopp...@gmail.com>
AuthorDate: Tue Jun 23 11:02:44 2020 +0200

    HIVE-23703: Major QB compaction with multiple FileSinkOperators results in 
data loss and one original file (Karen Coppage reviewed by Marta Kuczora))
    
    Closes #1134
---
 .../hive/ql/txn/compactor/CompactorOnTezTest.java  |   4 +-
 .../ql/txn/compactor/TestCrudCompactorOnMr.java    |  30 ++
 .../ql/txn/compactor/TestCrudCompactorOnTez.java   | 577 ++++++++++++++++-----
 .../hive/ql/txn/compactor/TestMmCompactorOnMr.java |  28 +
 .../ql/txn/compactor/TestMmCompactorOnTez.java     |   2 +-
 .../hive/ql/exec/AbstractFileMergeOperator.java    |  11 +
 .../hadoop/hive/ql/exec/FileSinkOperator.java      |  19 +-
 .../hadoop/hive/ql/exec/OrcFileMergeOperator.java  |  29 +-
 .../org/apache/hadoop/hive/ql/metadata/Hive.java   |  42 +-
 .../hadoop/hive/ql/optimizer/GenMapRedUtils.java   |   3 +
 .../apache/hadoop/hive/ql/plan/FileMergeDesc.java  |   9 +
 .../hadoop/hive/ql/metadata/TestHiveCopyFiles.java |  18 +-
 12 files changed, 604 insertions(+), 168 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
index 05d72ba..71232de 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
@@ -52,6 +52,7 @@ public class CompactorOnTezTest {
   protected HiveConf conf;
   protected IMetaStoreClient msClient;
   protected IDriver driver;
+  protected boolean runsOnTez = true;
 
   @Before
   // Note: we create a new conf and driver object before every test
@@ -274,7 +275,8 @@ public class CompactorOnTezTest {
 
     protected List<String> getBucketData(String tblName, String bucketId) 
throws Exception {
       return executeStatementOnDriverAndReturnResults(
-          "select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + 
bucketId + " order by ROW__ID", driver);
+          "select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + 
bucketId + " order"
+              + " by a, b", driver);
     }
 
     protected void dropTable(String tblName) throws Exception {
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnMr.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnMr.java
new file mode 100644
index 0000000..c60ff1d
--- /dev/null
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnMr.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Before;
+
+@SuppressWarnings("deprecation")
+public class TestCrudCompactorOnMr extends TestCrudCompactorOnTez {
+  @Before
+  public void setMr() {
+    driver.getConf().setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
+    runsOnTez = false;
+  }
+}
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index 4fb7860..2cd98ae 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -24,7 +24,6 @@ import java.util.Collections;
 import java.util.List;
 
 import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
@@ -43,12 +42,19 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hive.streaming.HiveStreamingConnection;
 import org.apache.hive.streaming.StreamingConnection;
 import org.apache.hive.streaming.StrictDelimitedInputWriter;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
 
 import static 
org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
@@ -60,11 +66,75 @@ import static org.mockito.Mockito.mock;
 public class TestCrudCompactorOnTez extends CompactorOnTezTest {
 
   @Test
-  public void testMajorCompaction() throws Exception {
+  public void testMajorCompactionNotPartitionedWithoutBuckets() throws 
Exception {
+    String dbName = "default";
+    String tblName = "testMajorCompaction";
+    TestDataProvider testDataProvider = new TestDataProvider();
+    testDataProvider.createFullAcidTable(tblName, false, false);
+    testDataProvider.insertTestData(tblName);
+    // Find the location of the table
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    FileSystem fs = FileSystem.get(conf);
+    // Verify deltas (delta_0000001_0000001_0000, delta_0000002_0000002_0000) 
are present
+    Assert.assertEquals("Delta directories does not match before compaction",
+        Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000",
+            "delta_0000004_0000004_0000"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, 
table, null));
+    // Verify that delete delta (delete_delta_0000003_0000003_0000) is present
+    Assert.assertEquals("Delete directories does not match",
+        Arrays.asList("delete_delta_0000003_0000003_0000", 
"delete_delta_0000005_0000005_0000"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, 
AcidUtils.deleteEventDeltaDirFilter, table, null));
+
+    List<String> expectedRsBucket0 = new ArrayList<>(Arrays.asList(
+        "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t2\t3",
+        "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t2\t4",
+        "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3",
+        "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t3\t4",
+        "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":4}\t4\t3",
+        "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":5}\t4\t4",
+        "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t2",
+        "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t5\t3",
+        "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":2}\t5\t4",
+        "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":3}\t6\t2",
+        "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":4}\t6\t3",
+        "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":5}\t6\t4"));
+    // Check bucket contents
+    Assert.assertEquals("pre-compaction bucket 0", expectedRsBucket0,
+        testDataProvider.getBucketData(tblName, "536870912"));
+
+    // Run major compaction and cleaner
+    CompactorTestUtil.runCompaction(conf, dbName, tblName, 
CompactionType.MAJOR, true);
+    CompactorTestUtil.runCleaner(conf);
+    verifySuccessfulCompaction(1);
+    // Should contain only one base directory now
+    String expectedBase = "base_0000005_v0000009";
+    Assert.assertEquals("Base directory does not match after major compaction",
+        Collections.singletonList(expectedBase),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, 
table, null));
+    // Check base dir contents
+    List<String> expectedBucketFiles = Arrays.asList("bucket_00000");
+    Assert.assertEquals("Bucket names are not matching after compaction", 
expectedBucketFiles,
+        CompactorTestUtil
+            .getBucketFileNames(fs, table, null, expectedBase));
+    // Check bucket contents
+    Assert.assertEquals("post-compaction bucket 0", expectedRsBucket0,
+        testDataProvider.getBucketData(tblName, "536870912"));
+    // Check bucket file contents
+    checkBucketIdAndRowIdInAcidFile(fs, new Path(table.getSd().getLocation(), 
expectedBase), 0);
+  }
+
+  /**
+   * TestDataProvider uses 2 buckets, I want to test 4 buckets here.
+   * @throws Exception
+   */
+  @Test
+  public void testMajorCompactionNotPartitioned4Buckets() throws Exception {
     String dbName = "default";
     String tblName = "testMajorCompaction";
     executeStatementOnDriver("drop table if exists " + tblName, driver);
-    executeStatementOnDriver("create transactional table " + tblName + " (a 
int, b int) clustered by (a) into 2 buckets"
+    executeStatementOnDriver("create transactional table " + tblName + " (a 
int, b int) clustered"
+        + " by (a) into 4 buckets"
         + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 
'transactional'='true',"
         + " 'transactional_properties'='default')", driver);
     executeStatementOnDriver("insert into " + tblName + " 
values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", driver);
@@ -72,77 +142,270 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
     executeStatementOnDriver("delete from " + tblName + " where b = 2", 
driver);
     // Find the location of the table
     IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+
     Table table = msClient.getTable(dbName, tblName);
     FileSystem fs = FileSystem.get(conf);
     // Verify deltas (delta_0000001_0000001_0000, delta_0000002_0000002_0000) 
are present
-    FileStatus[] filestatus = fs.listStatus(new 
Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter);
-    String[] deltas = new String[filestatus.length];
-    for (int i = 0; i < deltas.length; i++) {
-      deltas[i] = filestatus[i].getPath().getName();
-    }
-    Arrays.sort(deltas);
-    String[] expectedDeltas = new String[] { "delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000" };
-    if (!Arrays.deepEquals(expectedDeltas, deltas)) {
-      Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " 
+ Arrays.toString(deltas));
-    }
+    Assert.assertEquals("Delta directories does not match before compaction",
+        Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, 
table, null));
     // Verify that delete delta (delete_delta_0000003_0000003_0000) is present
-    FileStatus[] deleteDeltaStat = fs.listStatus(new 
Path(table.getSd().getLocation()),
-        AcidUtils.deleteEventDeltaDirFilter);
-    String[] deleteDeltas = new String[deleteDeltaStat.length];
-    for (int i = 0; i < deleteDeltas.length; i++) {
-      deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
-    }
-    Arrays.sort(deleteDeltas);
-    String[] expectedDeleteDeltas = new String[] { 
"delete_delta_0000003_0000003_0000" };
-    if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
-      Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", 
found: " + Arrays.toString(deleteDeltas));
+    Assert.assertEquals("Delete directories does not match",
+        Arrays.asList("delete_delta_0000003_0000003_0000"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, 
AcidUtils.deleteEventDeltaDirFilter, table, null));
+    List<String> expectedRsBucket0 = new ArrayList<>(Arrays.asList(
+        "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3",
+        "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t4"
+    ));
+    List<String> expectedRsBucket1 = new ArrayList<>(Arrays.asList(
+        "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3",
+        "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t1\t4",
+        "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t3",
+        "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":2}\t4\t4"
+    ));
+    List<String> expectedRsBucket2 = new ArrayList<>(Arrays.asList(
+        "{\"writeid\":2,\"bucketid\":537001984,\"rowid\":1}\t3\t3",
+        "{\"writeid\":2,\"bucketid\":537001984,\"rowid\":2}\t3\t4"
+    ));
+    TestDataProvider testDataProvider = new TestDataProvider();
+    List<String> preCompactionRsBucket0 = 
testDataProvider.getBucketData(tblName, "536870912");
+    List<String> preCompactionRsBucket1 = 
testDataProvider.getBucketData(tblName, "536936448");
+    List<String> preCompactionRsBucket2 = 
testDataProvider.getBucketData(tblName, "537001984");
+    if (runsOnTez) { // Check bucket contents
+      Assert.assertEquals("pre-compaction bucket 0", expectedRsBucket0, 
preCompactionRsBucket0);
+      Assert.assertEquals("pre-compaction bucket 1", expectedRsBucket1, 
preCompactionRsBucket1);
+      Assert.assertEquals("pre-compaction bucket 2", expectedRsBucket2, 
preCompactionRsBucket2);
+    } else {
+      // MR sometimes inserts rows in the opposite order from Tez, so rowids 
won't match. so we
+      // just check whether the bucket contents change during compaction.
+      expectedRsBucket0 = preCompactionRsBucket0;
+      expectedRsBucket1 = preCompactionRsBucket1;
+      expectedRsBucket2 = preCompactionRsBucket2;
     }
-    List<String> expectedRsBucket0 = new ArrayList<>();
-    
expectedRsBucket0.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3");
-    
expectedRsBucket0.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t4");
-    
expectedRsBucket0.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3");
-    
expectedRsBucket0.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t3\t4");
-    List<String> expectedRsBucket1 = new ArrayList<>();
-    
expectedRsBucket1.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3");
-    
expectedRsBucket1.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t1\t4");
-    
expectedRsBucket1.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t3");
-    
expectedRsBucket1.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":2}\t4\t4");
-    // Bucket 0
-    List<String> rsBucket0 = executeStatementOnDriverAndReturnResults("select 
ROW__ID, * from " + tblName
-        + " where ROW__ID.bucketid = 536870912 order by ROW__ID", driver);
-    Assert.assertEquals("normal read", expectedRsBucket0, rsBucket0);
-    // Bucket 1
-    List<String> rsBucket1 = executeStatementOnDriverAndReturnResults("select 
ROW__ID, * from " + tblName
-        + " where ROW__ID.bucketid = 536936448 order by ROW__ID", driver);
-    Assert.assertEquals("normal read", expectedRsBucket1, rsBucket1);
+
     // Run major compaction and cleaner
     CompactorTestUtil.runCompaction(conf, dbName, tblName, 
CompactionType.MAJOR, true);
     CompactorTestUtil.runCleaner(conf);
+    verifySuccessfulCompaction(1);
     // Should contain only one base directory now
-    filestatus = fs.listStatus(new Path(table.getSd().getLocation()));
-    String[] bases = new String[filestatus.length];
-    for (int i = 0; i < bases.length; i++) {
-      bases[i] = filestatus[i].getPath().getName();
+    String expectedBase = "base_0000003_v0000009";
+    Assert.assertEquals("Base directory does not match after major compaction",
+        Collections.singletonList(expectedBase),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, 
table, null));
+    // Check files in base
+    List<String> expectedBucketFiles = Arrays.asList("bucket_00000", 
"bucket_00001", "bucket_00002");
+    Assert.assertEquals("Bucket names are not matching after compaction", 
expectedBucketFiles,
+        CompactorTestUtil
+            .getBucketFileNames(fs, table, null, "base_0000003_v0000009"));
+    // Check buckets contents
+    Assert.assertEquals("post-compaction bucket 0", expectedRsBucket0, 
testDataProvider.getBucketData(tblName,
+      "536870912"));
+    Assert.assertEquals("post-compaction bucket 1", expectedRsBucket1, 
testDataProvider.getBucketData(tblName,
+      "536936448"));
+    Assert.assertEquals("post-compaction bucket 2", expectedRsBucket2, 
testDataProvider.getBucketData(tblName,
+      "537001984"));
+    // Check bucket file contents
+    checkBucketIdAndRowIdInAcidFile(fs, new Path(table.getSd().getLocation(), 
expectedBase), 0);
+    checkBucketIdAndRowIdInAcidFile(fs, new Path(table.getSd().getLocation(), 
expectedBase), 1);
+    checkBucketIdAndRowIdInAcidFile(fs, new Path(table.getSd().getLocation(), 
expectedBase), 2);
+  }
+
+  @Test
+  public void testMajorCompactionPartitionedWithoutBuckets() throws Exception {
+    String dbName = "default";
+    String tblName = "testMajorCompaction";
+    TestDataProvider testDataProvider = new TestDataProvider();
+    testDataProvider.createFullAcidTable(tblName, true, false);
+    testDataProvider.insertTestDataPartitioned(tblName);
+    // Find the location of the table
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    String tablePath = table.getSd().getLocation();
+    String partitionToday = "ds=today";
+    String partitionTomorrow = "ds=tomorrow";
+    String partitionYesterday = "ds=yesterday";
+    Path todayPath = new Path(tablePath, partitionToday);
+    Path tomorrowPath = new Path(tablePath, partitionTomorrow);
+    Path yesterdayPath = new Path(tablePath, partitionYesterday);
+    FileSystem fs = FileSystem.get(conf);
+    // Verify deltas
+    Assert.assertEquals("Delta directories does not match",
+        Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000004_0000004_0000"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, 
table, partitionToday));
+    // Verify delete delta
+    Assert.assertEquals("Delete directories does not match",
+        Arrays.asList("delete_delta_0000003_0000003_0000", 
"delete_delta_0000005_0000005_0000"),
+        CompactorTestUtil
+            .getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, 
table, partitionToday));
+
+    List<String> expectedRsBucket0 = new ArrayList<>(Arrays.asList(
+        "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3\tyesterday",
+        "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t4\ttoday",
+        "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t3\ttoday",
+        "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4\tyesterday",
+        "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t4\t3\ttomorrow",
+        "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t4\t4\ttoday",
+        "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t2\tyesterday",
+        "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t5\t3\tyesterday",
+        "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t4\ttoday",
+        "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t6\t2\ttoday",
+        "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":2}\t6\t3\ttoday",
+        "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":3}\t6\t4\ttoday"));
+    if (runsOnTez) { // Check bucket contents
+      Assert.assertEquals("pre-compaction bucket 0", expectedRsBucket0,
+          testDataProvider.getBucketData(tblName, "536870912"));
+    } else {
+      // MR sometimes inserts rows in the opposite order from Tez, so rowids 
won't match. so we
+      // just check whether the bucket contents change during compaction.
+      expectedRsBucket0 = testDataProvider.getBucketData(tblName, "536870912");
     }
-    Arrays.sort(bases);
-    String[] expectedBases = new String[] { "base_0000003_v0000008" };
-    if (!Arrays.deepEquals(expectedBases, bases)) {
-      Assert.fail("Expected: " + Arrays.toString(expectedBases) + ", found: " 
+ Arrays.toString(bases));
+
+    // Run major compaction and cleaner for all 3 partitions
+    CompactorTestUtil.runCompaction(conf, dbName, tblName, 
CompactionType.MAJOR, true,
+        partitionToday, partitionTomorrow, partitionYesterday);
+    CompactorTestUtil.runCleaner(conf);
+    // 3 compaction should be in the response queue with succeeded state
+    verifySuccessfulCompaction( 3);
+    // Should contain only one base directory now
+    Assert.assertEquals("Base directory does not match after major compaction",
+        Collections.singletonList("base_0000005_v0000009"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, 
table, partitionToday));
+    Assert.assertEquals("Base directory does not match after major compaction",
+        Collections.singletonList("base_0000005_v0000013"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, 
table, partitionTomorrow));
+    Assert.assertEquals("Base directory does not match after major compaction",
+        Collections.singletonList("base_0000005_v0000017"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, 
table, partitionYesterday));
+    // Check base dir contents
+    List<String> expectedBucketFiles = Arrays.asList("bucket_00000");
+    Assert.assertEquals("Bucket names are not matching after compaction", 
expectedBucketFiles,
+        CompactorTestUtil
+            .getBucketFileNames(fs, table, partitionToday, 
"base_0000005_v0000009"));
+    Assert.assertEquals("Bucket names are not matching after compaction", 
expectedBucketFiles,
+        CompactorTestUtil
+            .getBucketFileNames(fs, table, partitionTomorrow, 
"base_0000005_v0000013"));
+    Assert.assertEquals("Bucket names are not matching after compaction", 
expectedBucketFiles,
+        CompactorTestUtil
+            .getBucketFileNames(fs, table, partitionYesterday, 
"base_0000005_v0000017"));
+    // Check buckets contents
+    Assert.assertEquals("post-compaction bucket 0", expectedRsBucket0,
+        testDataProvider.getBucketData(tblName, "536870912"));
+    // Check bucket file contents
+    checkBucketIdAndRowIdInAcidFile(fs, new Path(todayPath, 
"base_0000005_v0000009"), 0);
+    checkBucketIdAndRowIdInAcidFile(fs, new Path(tomorrowPath, 
"base_0000005_v0000013"), 0);
+    checkBucketIdAndRowIdInAcidFile(fs, new Path(yesterdayPath, 
"base_0000005_v0000017"), 0);
+  }
+
+  @Test public void testMajorCompactionPartitionedWithBuckets() throws 
Exception {
+    String dbName = "default";
+    String tableName = "testMajorCompaction";
+    // Create test table
+    TestDataProvider dataProvider = new TestDataProvider();
+    dataProvider.createFullAcidTable(tableName, true, true);
+    // Find the location of the table
+    IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+    Table table = metaStoreClient.getTable(dbName, tableName);
+    FileSystem fs = FileSystem.get(conf);
+    // Insert test data into test table
+    dataProvider.insertTestDataPartitioned(tableName);
+    // Get all data before compaction is run
+    List<String> expectedData = dataProvider.getAllData(tableName);
+    // Verify deltas
+    String partitionToday = "ds=today";
+    String partitionTomorrow = "ds=tomorrow";
+    String partitionYesterday = "ds=yesterday";
+    Assert.assertEquals("Delta directories does not match",
+        Arrays.asList("delta_0000001_0000001_0000", 
"delta_0000002_0000002_0000", "delta_0000004_0000004_0000"),
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, 
table, partitionToday));
+    // Verify delete delta
+    Assert.assertEquals("Delete directories does not match",
+        Arrays.asList("delete_delta_0000003_0000003_0000", 
"delete_delta_0000005_0000005_0000"),
+        CompactorTestUtil
+            .getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, 
table, partitionToday));
+    // Check bucket contents
+    List<String> expectedRsBucket0 = Arrays.asList(
+        "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t3\ttoday",
+        "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4\tyesterday");
+    List<String> rsBucket0 = dataProvider.getBucketData(tableName, 
"536870912");
+
+    List<String> expectedRsBucket1 = Arrays.asList(
+        "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t3\tyesterday",
+        "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4\ttoday",
+        "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t3\ttomorrow",
+        "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t4\ttoday",
+        "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t2\tyesterday",
+        "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3\tyesterday",
+        "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t4\ttoday",
+        "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t6\t2\ttoday",
+        "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":2}\t6\t3\ttoday",
+        "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":3}\t6\t4\ttoday");
+    List<String> rsBucket1 = dataProvider.getBucketData(tableName, 
"536936448");
+    if (runsOnTez) {
+      Assert.assertEquals(expectedRsBucket0, rsBucket0);
+      Assert.assertEquals(expectedRsBucket1, rsBucket1);
+    } else {
+      // MR sometimes inserts rows in the opposite order from Tez, so rowids 
won't match. so we
+      // just check whether the bucket contents change during compaction.
+      expectedRsBucket0 = rsBucket0;
+      expectedRsBucket1 = rsBucket1;
     }
+
+    // Run a compaction
+    CompactorTestUtil
+        .runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true, 
partitionToday,
+            partitionTomorrow,
+            partitionYesterday);
+    // Clean up resources
+    CompactorTestUtil.runCleaner(conf);
+    // 3 compactions should be in the response queue with succeeded state
+    verifySuccessfulCompaction( 3);
+    // Verify base directories after compaction in each partition
+    String expectedBaseToday = "base_0000005_v0000011";
+    String expectedBaseTomorrow = "base_0000005_v0000015";
+    String expectedBaseYesterday = "base_0000005_v0000019";
+    List<String> baseDeltasInToday =
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, 
table, partitionToday);
+    Assert.assertEquals("Delta directories does not match after compaction",
+        Collections.singletonList(expectedBaseToday), baseDeltasInToday);
+    List<String> baseDeltasInTomorrow =
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, 
table, partitionTomorrow);
+    Assert.assertEquals("Delta directories does not match after compaction",
+        Collections.singletonList(expectedBaseTomorrow), baseDeltasInTomorrow);
+    List<String> baseDeltasInYesterday =
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, 
table, partitionYesterday);
+    Assert.assertEquals("Delta directories does not match after compaction",
+        Collections.singletonList(expectedBaseYesterday), 
baseDeltasInYesterday);
+    // Verify contents of bases
+    Assert.assertEquals("Bucket names are not matching after compaction", 
Arrays.asList("bucket_00000", "bucket_00001"),
+        CompactorTestUtil
+            .getBucketFileNames(fs, table, partitionToday, expectedBaseToday));
+    Assert.assertEquals("Bucket names are not matching after compaction", 
Arrays.asList("bucket_00001"),
+        CompactorTestUtil
+            .getBucketFileNames(fs, table, partitionTomorrow, 
expectedBaseTomorrow));
+    Assert.assertEquals("Bucket names are not matching after compaction", 
Arrays.asList("bucket_00000", "bucket_00001"),
+        CompactorTestUtil
+            .getBucketFileNames(fs, table, partitionYesterday, 
expectedBaseYesterday));
+    // Verify contents of bucket files.
     // Bucket 0
-    List<String> rsCompactBucket0 = 
executeStatementOnDriverAndReturnResults("select ROW__ID, * from  " + tblName
-        + " where ROW__ID.bucketid = 536870912", driver);
-    Assert.assertEquals("compacted read", rsBucket0, rsCompactBucket0);
+    rsBucket0 = dataProvider.getBucketData(tableName, "536870912");
+    Assert.assertEquals(expectedRsBucket0, rsBucket0);
     // Bucket 1
-    List<String> rsCompactBucket1 = 
executeStatementOnDriverAndReturnResults("select ROW__ID, * from  " + tblName
-        + " where ROW__ID.bucketid = 536936448", driver);
-    Assert.assertEquals("compacted read", rsBucket1, rsCompactBucket1);
-    // Clean up
-    executeStatementOnDriver("drop table " + tblName, driver);
+    rsBucket1 = dataProvider.getBucketData(tableName, "536936448");
+    Assert.assertEquals(expectedRsBucket1, rsBucket1);
+    // Verify all contents
+    List<String> actualData = dataProvider.getAllData(tableName);
+    Assert.assertEquals(expectedData, actualData);
+    String tablePath = table.getSd().getLocation();
+    checkBucketIdAndRowIdInAcidFile(fs, new Path(new Path(tablePath, 
partitionToday), expectedBaseToday), 0);
+    checkBucketIdAndRowIdInAcidFile(fs, new Path(new Path(tablePath, 
partitionToday), expectedBaseToday), 1);
+    checkBucketIdAndRowIdInAcidFile(fs, new Path(new Path(tablePath, 
partitionTomorrow), expectedBaseTomorrow), 1);
+    checkBucketIdAndRowIdInAcidFile(fs, new Path(new Path(tablePath, 
partitionYesterday), expectedBaseYesterday), 0);
+    checkBucketIdAndRowIdInAcidFile(fs, new Path(new Path(tablePath, 
partitionYesterday), expectedBaseYesterday), 1);
   }
 
   @Test
   public void testMinorCompactionNotPartitionedWithoutBuckets() throws 
Exception {
+    Assume.assumeTrue(runsOnTez);
     String dbName = "default";
     String tableName = "testMinorCompaction";
     // Create test table
@@ -169,10 +432,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
     // Clean up resources
     CompactorTestUtil.runCleaner(conf);
     // Only 1 compaction should be in the response queue with succeeded state
-    List<ShowCompactResponseElement> compacts =
-        TxnUtils.getTxnStore(conf).showCompact(new 
ShowCompactRequest()).getCompacts();
-    Assert.assertEquals("Completed compaction queue must contain one element", 
1, compacts.size());
-    Assert.assertEquals("Compaction state is not succeeded", "succeeded", 
compacts.get(0).getState());
+    verifySuccessfulCompaction(1);
     // Verify delta directories after compaction
     List<String> actualDeltasAfterComp =
         CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, 
table, null);
@@ -190,7 +450,8 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
         CompactorTestUtil.getBucketFileNames(fs, table, null, 
actualDeleteDeltasAfterComp.get(0)));
     // Verify contents of bucket files.
     // Bucket 0
-    List<String> expectedRsBucket0 = 
Arrays.asList("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t2\t3",
+    List<String> expectedRsBucket0 = Arrays.asList(
+        "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t2\t3",
         "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t2\t4",
         "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3",
         "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t3\t4",
@@ -213,6 +474,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
 
   @Test
   public void testMinorCompactionNotPartitionedWithBuckets() throws Exception {
+    Assume.assumeTrue(runsOnTez);
     String dbName = "default";
     String tableName = "testMinorCompaction";
     // Create test table
@@ -239,10 +501,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
     // Clean up resources
     CompactorTestUtil.runCleaner(conf);
     // Only 1 compaction should be in the response queue with succeeded state
-    List<ShowCompactResponseElement> compacts =
-        TxnUtils.getTxnStore(conf).showCompact(new 
ShowCompactRequest()).getCompacts();
-    Assert.assertEquals("Completed compaction queue must contain one element", 
1, compacts.size());
-    Assert.assertEquals("Compaction state is not succeeded", "succeeded", 
compacts.get(0).getState());
+    verifySuccessfulCompaction(1);
     // Verify delta directories after compaction
     List<String> actualDeltasAfterComp =
         CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, 
table, null);
@@ -260,12 +519,14 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
         CompactorTestUtil.getBucketFileNames(fs, table, null, 
actualDeleteDeltasAfterComp.get(0)));
     // Verify contents of bucket files.
     // Bucket 0
-    List<String> expectedRsBucket0 = 
Arrays.asList("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3",
+    List<String> expectedRsBucket0 = Arrays.asList(
+        "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3",
         "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t3\t4");
     List<String> rsBucket0 = dataProvider.getBucketData(tableName, 
"536870912");
     Assert.assertEquals(expectedRsBucket0, rsBucket0);
     // Bucket 1
-    List<String> expectedRs1Bucket = 
Arrays.asList("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t3",
+    List<String> expectedRs1Bucket = Arrays.asList(
+        "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t3",
         "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t2\t4",
         "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t3",
         "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
@@ -286,6 +547,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
 
   @Test
   public void testMinorCompactionPartitionedWithoutBuckets() throws Exception {
+    Assume.assumeTrue(runsOnTez);
     String dbName = "default";
     String tableName = "testMinorCompaction";
     // Create test table
@@ -318,10 +580,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
     // Clean up resources
     CompactorTestUtil.runCleaner(conf);
     // 3 compaction should be in the response queue with succeeded state
-    List<ShowCompactResponseElement> compacts =
-        TxnUtils.getTxnStore(conf).showCompact(new 
ShowCompactRequest()).getCompacts();
-    Assert.assertEquals("Completed compaction queue must contain 3 element", 
3, compacts.size());
-    compacts.forEach(c -> Assert.assertEquals("Compaction state is not 
succeeded", "succeeded", c.getState()));
+    verifySuccessfulCompaction(3);
     // Verify delta directories after compaction in each partition
     List<String> actualDeltasAfterCompPartToday =
         CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, 
table, partitionToday);
@@ -340,23 +599,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
     Assert.assertEquals("Bucket names are not matching after compaction", 
expectedBucketFiles,
         CompactorTestUtil
             .getBucketFileNames(fs, table, partitionToday, 
actualDeleteDeltasAfterCompPartToday.get(0)));
-    // Verify contents of bucket files.
-    // Bucket 0
-    List<String> expectedRsBucket0 = Arrays
-        
.asList("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3\tyesterday",
-            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t4\ttoday",
-            "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t3\ttoday",
-            
"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4\tyesterday",
-            
"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t4\t3\ttomorrow",
-            "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t4\t4\ttoday",
-            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t4\ttoday",
-            
"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t2\tyesterday",
-            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t6\t2\ttoday",
-            
"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t5\t3\tyesterday",
-            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":2}\t6\t3\ttoday",
-            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":3}\t6\t4\ttoday");
-    List<String> rsBucket0 = dataProvider.getBucketData(tableName, 
"536870912");
-    Assert.assertEquals(expectedRsBucket0, rsBucket0);
+
     // Verify all contents
     List<String> actualData = dataProvider.getAllData(tableName);
     Assert.assertEquals(expectedData, actualData);
@@ -366,6 +609,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
 
   @Test
   public void testMinorCompactionPartitionedWithBuckets() throws Exception {
+    Assume.assumeTrue(runsOnTez);
     String dbName = "default";
     String tableName = "testMinorCompaction";
     // Create test table
@@ -397,11 +641,8 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
             partitionYesterday);
     // Clean up resources
     CompactorTestUtil.runCleaner(conf);
-    // Only 1 compaction should be in the response queue with succeeded state
-    List<ShowCompactResponseElement> compacts =
-        TxnUtils.getTxnStore(conf).showCompact(new 
ShowCompactRequest()).getCompacts();
-    Assert.assertEquals("Completed compaction queue must contain 3 element", 
3, compacts.size());
-    compacts.forEach(c -> Assert.assertEquals("Compaction state is not 
succeeded", "succeeded", c.getState()));
+    // 3 compactions should be in the response queue with succeeded state
+    verifySuccessfulCompaction( 3);
     // Verify delta directories after compaction in each partition
     List<String> actualDeltasAfterCompPartToday =
         CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, 
table, partitionToday);
@@ -422,19 +663,21 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
             .getBucketFileNames(fs, table, partitionToday, 
actualDeleteDeltasAfterCompPartToday.get(0)));
     // Verify contents of bucket files.
     // Bucket 0
-    List<String> expectedRsBucket0 = 
Arrays.asList("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t3\ttoday",
+    List<String> expectedRsBucket0 = Arrays.asList(
+        "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t3\ttoday",
         "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4\tyesterday");
     List<String> rsBucket0 = dataProvider.getBucketData(tableName, 
"536870912");
     Assert.assertEquals(expectedRsBucket0, rsBucket0);
     // Bucket 1
-    List<String> expectedRsBucket1 = 
Arrays.asList("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4\ttoday",
+    List<String> expectedRsBucket1 = Arrays.asList(
         "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t3\tyesterday",
+        "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4\ttoday",
         "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t3\ttomorrow",
         "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t4\ttoday",
-        "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t4\ttoday",
         "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t2\tyesterday",
-        "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t6\t2\ttoday",
         "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3\tyesterday",
+        "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t4\ttoday",
+        "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t6\t2\ttoday",
         "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":2}\t6\t3\ttoday",
         "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":3}\t6\t4\ttoday");
     List<String> rsBucket1 = dataProvider.getBucketData(tableName, 
"536936448");
@@ -448,6 +691,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
 
   @Test
   public void testMinorCompaction10DeltaDirs() throws Exception {
+    Assume.assumeTrue(runsOnTez);
     String dbName = "default";
     String tableName = "testMinorCompaction";
     // Create test table
@@ -473,10 +717,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
     CompactorTestUtil.runCompaction(conf, dbName, tableName, 
CompactionType.MINOR, true);
     // Clean up resources
     CompactorTestUtil.runCleaner(conf);
-    List<ShowCompactResponseElement> compacts =
-        TxnUtils.getTxnStore(conf).showCompact(new 
ShowCompactRequest()).getCompacts();
-    Assert.assertEquals("Completed compaction queue must contain 3 element", 
1, compacts.size());
-    compacts.forEach(c -> Assert.assertEquals("Compaction state is not 
succeeded", "succeeded", c.getState()));
+    verifySuccessfulCompaction( 1);
     // Verify delta directories after compaction
     List<String> actualDeltasAfterComp =
         CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, 
table, null);
@@ -503,6 +744,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
 
   @Test
   public void testMultipleMinorCompactions() throws Exception {
+    Assume.assumeTrue(runsOnTez);
     String dbName = "default";
     String tableName = "testMinorCompaction";
     // Create test table
@@ -519,20 +761,15 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
     // Clean up resources
     CompactorTestUtil.runCleaner(conf);
     // Only 1 compaction should be in the response queue with succeeded state
-    List<ShowCompactResponseElement> compacts =
-        TxnUtils.getTxnStore(conf).showCompact(new 
ShowCompactRequest()).getCompacts();
-    Assert.assertEquals("Completed compaction queue must contain one element", 
1, compacts.size());
-    Assert.assertEquals("Compaction state is not succeeded", "succeeded", 
compacts.get(0).getState());
+    verifySuccessfulCompaction(1);
     // Insert test data into test table
     dataProvider.insertTestData(tableName);
     // Run a compaction
     CompactorTestUtil.runCompaction(conf, dbName, tableName, 
CompactionType.MINOR, true);
     // Clean up resources
     CompactorTestUtil.runCleaner(conf);
-    // 2 compaction should be in the response queue with succeeded state
-    compacts = TxnUtils.getTxnStore(conf).showCompact(new 
ShowCompactRequest()).getCompacts();
-    Assert.assertEquals("Completed compaction queue must contain one element", 
2, compacts.size());
-    Assert.assertEquals("Compaction state is not succeeded", "succeeded", 
compacts.get(1).getState());
+    // 2 compactions should be in the response queue with succeeded state
+    verifySuccessfulCompaction(2);
     // Insert test data into test table
     dataProvider.insertTestData(tableName);
     // Run a compaction
@@ -540,9 +777,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
     // Clean up resources
     CompactorTestUtil.runCleaner(conf);
     // 3 compaction should be in the response queue with succeeded state
-    compacts = TxnUtils.getTxnStore(conf).showCompact(new 
ShowCompactRequest()).getCompacts();
-    Assert.assertEquals("Completed compaction queue must contain one element", 
3, compacts.size());
-    Assert.assertEquals("Compaction state is not succeeded", "succeeded", 
compacts.get(2).getState());
+    verifySuccessfulCompaction(3);
     // Verify delta directories after compaction
     List<String> actualDeltasAfterComp =
         CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, 
table, null);
@@ -557,6 +792,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
 
   @Test
   public void testMinorCompactionWhileStreaming() throws Exception {
+    Assume.assumeTrue(runsOnTez);
     String dbName = "default";
     String tableName = "testMinorCompaction";
     executeStatementOnDriver("drop table if exists " + tableName, driver);
@@ -595,6 +831,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
 
   @Test
   public void testMinorCompactionWhileStreamingAfterAbort() throws Exception {
+    Assume.assumeTrue(runsOnTez);
     String dbName = "default";
     String tableName = "testMinorCompaction";
     executeStatementOnDriver("drop table if exists " + tableName, driver);
@@ -620,6 +857,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
 
   @Test
   public void testMinorCompactionWhileStreamingWithAbort() throws Exception {
+    Assume.assumeTrue(runsOnTez);
     String dbName = "default";
     String tableName = "testMinorCompaction";
     executeStatementOnDriver("drop table if exists " + tableName, driver);
@@ -646,6 +884,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
 
   @Test
   public void testMinorCompactionWhileStreamingWithAbortInMiddle() throws 
Exception {
+    Assume.assumeTrue(runsOnTez);
     String dbName = "default";
     String tableName = "testMinorCompaction";
     executeStatementOnDriver("drop table if exists " + tableName, driver);
@@ -683,6 +922,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
 
   @Test
   public void testMajorCompactionAfterMinor() throws Exception {
+    Assume.assumeTrue(runsOnTez);
     String dbName = "default";
     String tableName = "testMinorCompaction";
     // Create test table
@@ -702,10 +942,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
     // Clean up resources
     CompactorTestUtil.runCleaner(conf);
     // Only 1 compaction should be in the response queue with succeeded state
-    List<ShowCompactResponseElement> compacts =
-        TxnUtils.getTxnStore(conf).showCompact(new 
ShowCompactRequest()).getCompacts();
-    Assert.assertEquals("Completed compaction queue must contain one element", 
1, compacts.size());
-    Assert.assertEquals("Compaction state is not succeeded", "succeeded", 
compacts.get(0).getState());
+    verifySuccessfulCompaction(1);
     // Verify delta directories after compaction
     Assert.assertEquals("Delta directories does not match after minor 
compaction",
         Collections.singletonList("delta_0000001_0000005_v0000009"),
@@ -725,9 +962,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
     // Clean up resources
     CompactorTestUtil.runCleaner(conf);
     // 2 compaction should be in the response queue with succeeded state
-    compacts = TxnUtils.getTxnStore(conf).showCompact(new 
ShowCompactRequest()).getCompacts();
-    Assert.assertEquals("Completed compaction queue must contain one element", 
2, compacts.size());
-    Assert.assertEquals("Compaction state is not succeeded", "succeeded", 
compacts.get(1).getState());
+    verifySuccessfulCompaction(2);
     // Verify base directory after compaction
     Assert.assertEquals("Base directory does not match after major compaction",
         Collections.singletonList("base_0000010_v0000029"),
@@ -739,6 +974,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
 
   @Test
   public void testMinorCompactionAfterMajor() throws Exception {
+    Assume.assumeTrue(runsOnTez);
     String dbName = "default";
     String tableName = "testMinorCompaction";
     // Create test table
@@ -758,10 +994,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
     // Clean up resources
     CompactorTestUtil.runCleaner(conf);
     // Only 1 compaction should be in the response queue with succeeded state
-    List<ShowCompactResponseElement> compacts =
-        TxnUtils.getTxnStore(conf).showCompact(new 
ShowCompactRequest()).getCompacts();
-    Assert.assertEquals("Completed compaction queue must contain one element", 
1, compacts.size());
-    Assert.assertEquals("Compaction state is not succeeded", "succeeded", 
compacts.get(0).getState());
+    verifySuccessfulCompaction(1);
     // Verify base directory after compaction
     Assert.assertEquals("Base directory does not match after major compaction",
         Collections.singletonList("base_0000005_v0000009"),
@@ -778,9 +1011,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
     // Clean up resources
     CompactorTestUtil.runCleaner(conf);
     // 2 compaction should be in the response queue with succeeded state
-    compacts = TxnUtils.getTxnStore(conf).showCompact(new 
ShowCompactRequest()).getCompacts();
-    Assert.assertEquals("Completed compaction queue must contain one element", 
2, compacts.size());
-    Assert.assertEquals("Compaction state is not succeeded", "succeeded", 
compacts.get(1).getState());
+    verifySuccessfulCompaction(2);
     // Verify base directory after compaction
     Assert.assertEquals("Base directory does not match after major compaction",
         Collections.singletonList("base_0000005_v0000009"),
@@ -795,6 +1026,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
 
   @Test
   public void testMinorCompactionWhileStreamingWithSplitUpdate() throws 
Exception {
+    Assume.assumeTrue(runsOnTez);
     String dbName = "default";
     String tableName = "testMinorCompaction";
     executeStatementOnDriver("drop table if exists " + tableName, driver);
@@ -851,10 +1083,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
         + " partition (ds) 
values(3,2,1000,'yesterday'),(3,3,1001,'today'),(3,4,1002,'yesterday'),(4,2,1003,'today'),"
         + "(4,3,1004,'yesterday'),(4,4,1005,'today')", driver);
     executeStatementOnDriver("delete from " + tblName + " where b = 2", 
driver);
-    //  Run major compaction and cleaner
-    CompactorTestUtil
-        .runCompaction(conf, dbName, tblName, CompactionType.MAJOR, true, 
"ds=yesterday", "ds=today");
-    CompactorTestUtil.runCleaner(conf);
+
     List<String> expectedRsBucket0PtnToday = new ArrayList<>();
     
expectedRsBucket0PtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t3\tNULL\ttoday");
     
expectedRsBucket0PtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t4\tNULL\ttoday");
@@ -863,12 +1092,33 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
     
expectedRsBucket1PtnToday.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3\tNULL\ttoday");
     
expectedRsBucket1PtnToday.add("{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t4\t4\t1005\ttoday");
     // Bucket 0, partition 'today'
+    List<String> rsBucket0PtnToday = 
executeStatementOnDriverAndReturnResults("select ROW__ID, * from  "
+        + tblName + " where ROW__ID.bucketid = 536870912 and ds='today' order 
by a,b", driver);
+    // Bucket 1, partition 'today'
+    List<String> rsBucket1PtnToday = 
executeStatementOnDriverAndReturnResults("select ROW__ID, * from  "
+        + tblName + " where ROW__ID.bucketid = 536936448 and ds='today' order 
by a,b", driver);
+    if (runsOnTez) {
+      Assert.assertEquals("pre-compaction read", expectedRsBucket0PtnToday, 
rsBucket0PtnToday);
+      Assert.assertEquals("pre-compaction read", expectedRsBucket1PtnToday, 
rsBucket1PtnToday);
+    } else {
+      // MR sometimes inserts rows in the opposite order from Tez, so rowids 
won't match. so we
+      // just check whether the bucket contents change during compaction.
+      expectedRsBucket0PtnToday = rsBucket0PtnToday;
+      expectedRsBucket1PtnToday = rsBucket1PtnToday;
+    }
+
+    //  Run major compaction and cleaner
+    CompactorTestUtil
+        .runCompaction(conf, dbName, tblName, CompactionType.MAJOR, true, 
"ds=yesterday", "ds=today");
+    CompactorTestUtil.runCleaner(conf);
+
+    // Bucket 0, partition 'today'
     List<String> rsCompactBucket0PtnToday = 
executeStatementOnDriverAndReturnResults("select ROW__ID, * from  "
-        + tblName + " where ROW__ID.bucketid = 536870912 and ds='today'", 
driver);
+        + tblName + " where ROW__ID.bucketid = 536870912 and ds='today' order 
by a,b", driver);
     Assert.assertEquals("compacted read", expectedRsBucket0PtnToday, 
rsCompactBucket0PtnToday);
     // Bucket 1, partition 'today'
     List<String> rsCompactBucket1PtnToday = 
executeStatementOnDriverAndReturnResults("select ROW__ID, * from  "
-        + tblName + " where ROW__ID.bucketid = 536936448 and ds='today'", 
driver);
+        + tblName + " where ROW__ID.bucketid = 536936448 and ds='today' order 
by a,b", driver);
     Assert.assertEquals("compacted read", expectedRsBucket1PtnToday, 
rsCompactBucket1PtnToday);
     // Clean up
     executeStatementOnDriver("drop table " + tblName, driver);
@@ -953,7 +1203,7 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
     // Run a compaction
     CompactorTestUtil.runCompaction(conf, dbName, tableName, compactionType, 
true);
     CompactorTestUtil.runCleaner(conf);
-    verifySuccessulTxn(1);
+    verifySuccessfulCompaction(1);
     // Verify directories after compaction
     PathFilter pathFilter = compactionType == CompactionType.MAJOR ? 
AcidUtils.baseFileFilter :
         AcidUtils.deltaFileFilter;
@@ -973,19 +1223,70 @@ public class TestCrudCompactorOnTez extends 
CompactorOnTezTest {
   /**
    * Verify that the expected number of transactions have run, and their state 
is "succeeded".
    *
-   * @param expectedCompleteCompacts number of compactions already run
+   * @param expectedSuccessfulCompactions number of compactions already run
    * @throws MetaException
    */
-  private void verifySuccessulTxn(int expectedCompleteCompacts) throws 
MetaException {
+  private void verifySuccessfulCompaction(int expectedSuccessfulCompactions) 
throws MetaException {
     List<ShowCompactResponseElement> compacts =
         TxnUtils.getTxnStore(conf).showCompact(new 
ShowCompactRequest()).getCompacts();
-    Assert.assertEquals("Completed compaction queue must contain one element",
-        expectedCompleteCompacts, compacts.size());
+    Assert.assertEquals("Completed compaction queue must contain " + 
expectedSuccessfulCompactions + " element(s)",
+        expectedSuccessfulCompactions, compacts.size());
     compacts.forEach(
         c -> Assert.assertEquals("Compaction state is not succeeded", 
"succeeded", c.getState()));
   }
 
   /**
+   * Read file, and
+   * 1. make sure that the bucket property in each row matches the file name.
+   * For example, if the bucketId is 0, we check file bucket_00000 to make 
sure that the third
+   * column contains only the value 536870912.
+   * 2. make sure that rowIds are in ascending order
+   * @param fs file system
+   * @param path where to look for the bucket file
+   * @param bucketId bucket Id to check, e.g. 0.
+   */
+  private void checkBucketIdAndRowIdInAcidFile(FileSystem fs, Path path, int 
bucketId) throws IOException {
+    Path bucketFilePath = AcidUtils.createBucketFile(path, bucketId);
+    Reader orcReader = OrcFile.createReader(bucketFilePath,
+        OrcFile.readerOptions(fs.getConf()).filesystem(fs));
+    TypeDescription schema = orcReader.getSchema();
+    try (RecordReader rows = orcReader.rows()) {
+      VectorizedRowBatch batch = schema.createRowBatch();
+      rows.nextBatch(batch);
+      // check that bucket property in each row matches the bucket in the file 
name
+      long[] bucketIdVector = ((LongColumnVector) batch.cols[2]).vector;
+      for (int i = 0; i < batch.count(); i++) {
+        Assert.assertEquals(bucketId, decodeBucketProperty(bucketIdVector[i]));
+      }
+      // check that writeIds, then rowIds are sorted in ascending order
+      long[] writeIdVector = ((LongColumnVector) batch.cols[1]).vector;
+      long[] rowIdVector = ((LongColumnVector) batch.cols[3]).vector;
+      long writeId = writeIdVector[0];
+      long rowId = 0;
+      for (int i = 0; i < batch.count(); i++) {
+        long currentWriteId = writeIdVector[i];
+        long currentRowId = rowIdVector[i];
+        if (writeId == writeIdVector[i]) {
+          Assert.assertTrue(rowId <= currentRowId);
+          rowId = currentRowId;
+        } else {
+          Assert.assertTrue(writeId < currentWriteId);
+          writeId = currentWriteId;
+          rowId = 0;
+        }
+      }
+    }
+  }
+
+  /**
+   * Couldn't find any way to get the bucket property from BucketCodec, so 
just reverse
+   * engineered the encoding. The actual bucketId is represented by bits 2-11 
of 29 bits
+   */
+  private int decodeBucketProperty(long bucketCodec) {
+    return (int) ((bucketCodec >> 16) & (0xFFF));
+  }
+
+  /**
    * Tests whether hive.llap.io.etl.skip.format config is handled properly 
whenever QueryCompactor#runCompactionQueries
    * is invoked.
    * @throws Exception
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnMr.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnMr.java
new file mode 100644
index 0000000..7ec23f7
--- /dev/null
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnMr.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Before;
+
+public class TestMmCompactorOnMr extends TestMmCompactorOnTez {
+  @Before
+  public void setMr() {
+    driver.getConf().setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
+  }
+}
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
index 451390a..9772bbe 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
@@ -45,7 +45,7 @@ import java.util.List;
 import static 
org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
 
 /**
- * Test functionality of MmMinorQueryCompactor,.
+ * Test functionality of MmMinorQueryCompactor.
  */
 public class TestMmCompactorOnTez extends CompactorOnTezTest {
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
index d8f8e72..87e6761 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
@@ -116,6 +117,10 @@ public abstract class AbstractFileMergeOperator<T extends 
FileMergeDesc>
       taskTmpPath = null;
       // Make sure we don't collide with the source.
       outPath = finalPath = new Path(tmpPath, taskId + ".merged");
+    } else if (conf.getIsCompactionTable()) {
+      taskTmpPath = ttp; // _task_tmp
+      finalPath = tp; // _tmp
+      outPath = ttp; // also _task_tmp
     } else {
       taskTmpPath = ttp;
       finalPath = new Path(tp, taskId);
@@ -373,6 +378,12 @@ public abstract class AbstractFileMergeOperator<T extends 
FileMergeDesc>
     return outPath;
   }
 
+  protected final Path getOutPath(int bucketId) {
+    String fileName = AcidUtils.BUCKET_PREFIX + 
String.format(AcidUtils.BUCKET_DIGITS, bucketId);
+    Path out = new Path(outPath, fileName);
+    return out;
+  }
+
   protected final void addIncompatibleFile(Path path) {
     incompatFileSet.add(path);
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 22a24f8..023f7df 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -334,6 +334,18 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
         if (!isMmTable && !isDirectInsert) {
           if (!bDynParts && !isSkewedStoredAsSubDirectories) {
             finalPaths[filesIdx] = new Path(parent, taskWithExt);
+            if (conf.isCompactionTable()) {
+              // Helper tables used for compaction are external and non-acid. 
We need to keep
+              // track of the taskId to avoid overwrites in the case of 
multiple
+              // FileSinkOperators, and the file names need to reflect the 
correct bucketId
+              // because the files will eventually be placed in an acid table, 
and the
+              // OrcFileMergeOperator should not merge data belonging to 
different buckets.
+              // Therefore during compaction, data will be stored in the final 
directory like:
+              // ${hive_staging_dir}/final_dir/taskid/bucketId
+              // For example, ${hive_staging 
dir}/-ext-10002/000000_0/bucket_00000
+              finalPaths[filesIdx] = new Path(finalPaths[filesIdx],
+                  AcidUtils.BUCKET_PREFIX + 
String.format(AcidUtils.BUCKET_DIGITS, bucketId));
+            }
           } else {
             finalPaths[filesIdx] =  new Path(buildTmpPath(), taskWithExt);
           }
@@ -801,12 +813,7 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
   protected void createBucketForFileIdx(FSPaths fsp, int filesIdx)
       throws HiveException {
     try {
-      if (conf.isCompactionTable()) {
-        fsp.initializeBucketPaths(filesIdx, AcidUtils.BUCKET_PREFIX + 
String.format(AcidUtils.BUCKET_DIGITS, bucketId),
-            isNativeTable(), isSkewedStoredAsSubDirectories);
-      } else {
-        fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), 
isSkewedStoredAsSubDirectories);
-      }
+      fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), 
isSkewedStoredAsSubDirectories);
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
         Utilities.FILE_OP_LOGGER.trace("createBucketForFileIdx " + filesIdx + 
": final path " + fsp.finalPaths[filesIdx]
           + "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath 
+ ", tmp path "
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
index b53205a..5ca7a04 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
@@ -28,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.orc.CompressionKind;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcFileKeyWrapper;
 import org.apache.hadoop.hive.ql.io.orc.OrcFileValueWrapper;
@@ -54,7 +57,7 @@ public class OrcFileMergeOperator extends
   private TypeDescription fileSchema;
   private int rowIndexStride = 0;
 
-  private Writer outWriter;
+  private Map<Integer, Writer> outWriters = new HashMap<>();
   private Path prevPath;
   private Reader reader;
   private FSDataInputStream fdis;
@@ -113,7 +116,11 @@ public class OrcFileMergeOperator extends
 
       // store the orc configuration from the first file. All other files 
should
       // match this configuration before merging else will not be merged
-      if (outWriter == null) {
+      int bucketId = 0;
+      if (conf.getIsCompactionTable()) {
+        bucketId = AcidUtils.parseBucketId(new Path(filePath));
+      }
+      if (outWriters.get(bucketId) == null) {
         compression = k.getCompression();
         compressBuffSize = k.getCompressBufferSize();
         fileVersion = k.getFileVersion();
@@ -134,7 +141,10 @@ public class OrcFileMergeOperator extends
         }
 
         Path outPath = getOutPath();
-        outWriter = OrcFile.createWriter(outPath, options);
+        if (conf.getIsCompactionTable()) {
+          outPath = getOutPath(bucketId);
+        }
+        outWriters.put(bucketId, OrcFile.createWriter(outPath, options));
         if (LOG.isDebugEnabled()) {
           LOG.info("ORC merge file output path: " + outPath);
         }
@@ -157,7 +167,7 @@ public class OrcFileMergeOperator extends
           (int) v.getStripeInformation().getLength());
 
       // append the stripe buffer to the new ORC file
-      outWriter.appendStripe(buffer, 0, buffer.length, 
v.getStripeInformation(),
+      outWriters.get(bucketId).appendStripe(buffer, 0, buffer.length, 
v.getStripeInformation(),
           v.getStripeStatistics());
 
       if (LOG.isInfoEnabled()) {
@@ -169,7 +179,7 @@ public class OrcFileMergeOperator extends
 
       // add user metadata to footer in case of any
       if (v.isLastStripeInFile()) {
-        outWriter.appendUserMetadata(v.getUserMetadata());
+        outWriters.get(bucketId).appendUserMetadata(v.getUserMetadata());
       }
     } catch (Throwable e) {
       exception = true;
@@ -252,9 +262,12 @@ public class OrcFileMergeOperator extends
         fdis = null;
       }
 
-      if (outWriter != null) {
-        outWriter.close();
-        outWriter = null;
+      if (outWriters != null) {
+        for (Map.Entry<Integer, Writer> outWriterEntry : 
outWriters.entrySet()) {
+          Writer outWriter = outWriterEntry.getValue();
+          outWriter.close();
+          outWriter = null;
+        }
       }
     } catch (Exception e) {
       throw new HiveException("Unable to close OrcFileMergeOperator", e);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index a2dbeeb..044ce5c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2372,7 +2372,7 @@ public class Hive {
           FileSystem fs = destPath.getFileSystem(conf);
           copyFiles(conf, loadPath, destPath, fs, isSrcLocal, 
isAcidIUDoperation,
               (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles,
-              tbl.getNumBuckets() > 0, isFullAcidTable, isManaged);
+              tbl.getNumBuckets() > 0, isFullAcidTable, isManaged, false);
         }
       }
       perfLogger.PerfLogEnd("MoveTask", PerfLogger.FILE_MOVES);
@@ -3074,6 +3074,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
     boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl);
     boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl);
+    boolean isCompactionTable = 
AcidUtils.isCompactionTable(tbl.getParameters());
 
     if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
       newFiles = Collections.synchronizedList(new ArrayList<Path>());
@@ -3129,7 +3130,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
           FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
           copyFiles(conf, loadPath, destPath, fs, isSrcLocal, 
isAcidIUDoperation,
               loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles,
-              tbl.getNumBuckets() > 0, isFullAcidTable, isManaged);
+              tbl.getNumBuckets() > 0, isFullAcidTable, isManaged, 
isCompactionTable);
         } catch (IOException e) {
           throw new HiveException("addFiles: filesystem error in check phase", 
e);
         }
@@ -4089,7 +4090,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
   private static void copyFiles(final HiveConf conf, final FileSystem destFs,
             FileStatus[] srcs, final FileSystem srcFs, final Path destf,
             final boolean isSrcLocal, boolean isOverwrite,
-            final List<Path> newFiles, boolean acidRename, boolean isManaged) 
throws HiveException {
+            final List<Path> newFiles, boolean acidRename, boolean isManaged,
+            boolean isCompactionTable) throws HiveException {
 
     final HdfsUtils.HadoopFileStatus fullDestStatus;
     try {
@@ -4112,8 +4114,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
     // Sort the files
     Arrays.sort(srcs);
     String configuredOwner = HiveConf.getVar(conf, 
ConfVars.HIVE_LOAD_DATA_OWNER);
+    FileStatus[] files;
     for (FileStatus src : srcs) {
-      FileStatus[] files;
       if (src.isDirectory()) {
         try {
           files = srcFs.listStatus(src.getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
@@ -4127,6 +4129,30 @@ private void constructOneLBLocationMap(FileStatus fSta,
         files = new FileStatus[] {src};
       }
 
+      if (isCompactionTable) {
+        // Helper tables used for query-based compaction have a special file 
structure after
+        // filesink: tmpdir/attemptid/bucketid.
+        // We don't care about the attemptId anymore and don't want it in the 
table's final
+        // structure so just move the bucket files.
+        try {
+          List<FileStatus> fileStatuses = new ArrayList<>();
+          for (FileStatus file : files) {
+            if (file.isDirectory() && 
AcidUtils.originalBucketFilter.accept(file.getPath())) {
+              FileStatus[] taskDir = srcFs.listStatus(file.getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER);
+              fileStatuses.addAll(Arrays.asList(taskDir));
+            } else {
+              fileStatuses.add(file);
+            }
+          }
+          files = fileStatuses.toArray(new FileStatus[files.length]);
+        } catch (IOException e) {
+          if (null != pool) {
+            pool.shutdownNow();
+          }
+          throw new HiveException(e);
+        }
+      }
+
       final SessionState parentSession = SessionState.get();
       // Sort the files
       Arrays.sort(files);
@@ -4645,12 +4671,12 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @param newFiles if this is non-null, a list of files that were created as 
a result of this
    *                 move will be returned.
    * @param isManaged if table is managed.
+   * @param isCompactionTable is table used in query-based compaction
    * @throws HiveException
    */
   static protected void copyFiles(HiveConf conf, Path srcf, Path destf, 
FileSystem fs,
-                                  boolean isSrcLocal, boolean isAcidIUD,
-                                  boolean isOverwrite, List<Path> newFiles, 
boolean isBucketed,
-                                  boolean isFullAcidTable, boolean isManaged) 
throws HiveException {
+      boolean isSrcLocal, boolean isAcidIUD, boolean isOverwrite, List<Path> 
newFiles, boolean isBucketed,
+      boolean isFullAcidTable, boolean isManaged, boolean isCompactionTable) 
throws HiveException {
     try {
       // create the destination if it does not exist
       if (!fs.exists(destf)) {
@@ -4686,7 +4712,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
       // i.e, like 000000_0, 000001_0_copy_1, 000002_0.gz etc.
       // The extension is only maintained for files which are compressed.
       copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, isOverwrite,
-              newFiles, isFullAcidTable && !isBucketed, isManaged);
+              newFiles, isFullAcidTable && !isBucketed, isManaged, 
isCompactionTable);
     }
   }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 1ea3bd3..b46d2ec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat;
@@ -1671,6 +1672,8 @@ public final class GenMapRedUtils {
       fmd = new OrcFileMergeDesc();
     }
     fmd.setIsMmTable(fsInputDesc.isMmTable());
+    boolean isCompactionTable = 
AcidUtils.isCompactionTable(tblDesc.getProperties());
+    fmd.setIsCompactionTable(isCompactionTable);
     fmd.setWriteId(fsInputDesc.getTableWriteId());
     int stmtId = fsInputDesc.getStatementId();
     fmd.setStmtId(stmtId == -1 ? 0 : stmtId);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
index 6bd0053..0275da5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
@@ -31,6 +31,7 @@ public class FileMergeDesc extends AbstractOperatorDesc {
   private Long writeId;
   private int stmtId;
   private boolean isMmTable;
+  private boolean isCompactionTable;
 
   public FileMergeDesc(DynamicPartitionCtx dynPartCtx, Path outputDir) {
     this.dpCtx = dynPartCtx;
@@ -100,4 +101,12 @@ public class FileMergeDesc extends AbstractOperatorDesc {
   public void setIsMmTable(boolean isMmTable) {
     this.isMmTable = isMmTable;
   }
+
+  public boolean getIsCompactionTable() {
+    return isCompactionTable;
+  }
+
+  public void setIsCompactionTable(boolean isCompactionTable) {
+    this.isCompactionTable = isCompactionTable;
+  }
 }
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java 
b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java
index a0c23b6..f9c10f5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java
@@ -83,7 +83,8 @@ public class TestHiveCopyFiles {
     FileSystem targetFs = targetPath.getFileSystem(hiveConf);
 
     try {
-      Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, 
isSourceLocal, NO_ACID, false,null, false, false, false);
+      Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, 
isSourceLocal, NO_ACID, false,
+          null, false, false, false, false);
     } catch (HiveException e) {
       e.printStackTrace();
       assertTrue("Hive.copyFiles() threw an unexpected exception.", false);
@@ -107,7 +108,8 @@ public class TestHiveCopyFiles {
     FileSystem targetFs = targetPath.getFileSystem(hiveConf);
 
     try {
-      Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, 
isSourceLocal, NO_ACID, false, null, false, false, false);
+      Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, 
isSourceLocal, NO_ACID, false, null,
+          false, false, false, false);
     } catch (HiveException e) {
       e.printStackTrace();
       assertTrue("Hive.copyFiles() threw an unexpected exception.", false);
@@ -127,7 +129,8 @@ public class TestHiveCopyFiles {
     sourceFolder.newFile("000001_0.gz");
 
     try {
-      Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, 
isSourceLocal, NO_ACID, false, null, false, false, false);
+      Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, 
isSourceLocal, NO_ACID, false, null,
+          false, false, false, false);
     } catch (HiveException e) {
       e.printStackTrace();
       assertTrue("Hive.copyFiles() threw an unexpected exception.", false);
@@ -158,7 +161,8 @@ public class TestHiveCopyFiles {
     Mockito.when(spyTargetFs.getUri()).thenReturn(URI.create("hdfs://" + 
targetPath.toUri().getPath()));
 
     try {
-      Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, 
isSourceLocal, NO_ACID, false, null, false, false, false);
+      Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, 
isSourceLocal, NO_ACID, false, null, false, false, false,
+          false);
     } catch (HiveException e) {
       e.printStackTrace();
       assertTrue("Hive.copyFiles() threw an unexpected exception.", false);
@@ -185,7 +189,8 @@ public class TestHiveCopyFiles {
     Mockito.when(spyTargetFs.getUri()).thenReturn(URI.create("hdfs://" + 
targetPath.toUri().getPath()));
 
     try {
-      Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, 
isSourceLocal, NO_ACID, false, null, false, false, false);
+      Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, 
isSourceLocal, NO_ACID, false, null,
+          false, false, false, false);
     } catch (HiveException e) {
       e.printStackTrace();
       assertTrue("Hive.copyFiles() threw an unexpected exception.", false);
@@ -205,7 +210,8 @@ public class TestHiveCopyFiles {
     sourceFolder.newFile("000001_0.gz");
 
     try {
-      Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, 
isSourceLocal, NO_ACID, false, null, false, false, false);
+      Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, 
isSourceLocal, NO_ACID, false, null,
+          false, false, false, false);
     } catch (HiveException e) {
       e.printStackTrace();
       assertTrue("Hive.copyFiles() threw an unexpected exception.", false);

Reply via email to