This is an automated email from the ASF dual-hosted git repository. klcopp pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new c6ebd72 HIVE-23703: Major QB compaction with multiple FileSinkOperators results in data loss and one original file (Karen Coppage reviewed by Marta Kuczora)) c6ebd72 is described below commit c6ebd72a54c24671532e5024e0f430bb677e9bbe Author: Karen Coppage <karenlcopp...@gmail.com> AuthorDate: Tue Jun 23 11:02:44 2020 +0200 HIVE-23703: Major QB compaction with multiple FileSinkOperators results in data loss and one original file (Karen Coppage reviewed by Marta Kuczora)) Closes #1134 --- .../hive/ql/txn/compactor/CompactorOnTezTest.java | 4 +- .../ql/txn/compactor/TestCrudCompactorOnMr.java | 30 ++ .../ql/txn/compactor/TestCrudCompactorOnTez.java | 577 ++++++++++++++++----- .../hive/ql/txn/compactor/TestMmCompactorOnMr.java | 28 + .../ql/txn/compactor/TestMmCompactorOnTez.java | 2 +- .../hive/ql/exec/AbstractFileMergeOperator.java | 11 + .../hadoop/hive/ql/exec/FileSinkOperator.java | 19 +- .../hadoop/hive/ql/exec/OrcFileMergeOperator.java | 29 +- .../org/apache/hadoop/hive/ql/metadata/Hive.java | 42 +- .../hadoop/hive/ql/optimizer/GenMapRedUtils.java | 3 + .../apache/hadoop/hive/ql/plan/FileMergeDesc.java | 9 + .../hadoop/hive/ql/metadata/TestHiveCopyFiles.java | 18 +- 12 files changed, 604 insertions(+), 168 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java index 05d72ba..71232de 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java @@ -52,6 +52,7 @@ public class CompactorOnTezTest { protected HiveConf conf; protected IMetaStoreClient msClient; protected IDriver driver; + protected boolean runsOnTez = true; @Before // Note: we create a new conf and driver object before every test @@ -274,7 +275,8 @@ public class CompactorOnTezTest { protected List<String> getBucketData(String tblName, String bucketId) throws Exception { return executeStatementOnDriverAndReturnResults( - "select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + bucketId + " order by ROW__ID", driver); + "select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + bucketId + " order" + + " by a, b", driver); } protected void dropTable(String tblName) throws Exception { diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnMr.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnMr.java new file mode 100644 index 0000000..c60ff1d --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnMr.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.Before; + +@SuppressWarnings("deprecation") +public class TestCrudCompactorOnMr extends TestCrudCompactorOnTez { + @Before + public void setMr() { + driver.getConf().setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr"); + runsOnTez = false; + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java index 4fb7860..2cd98ae 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -24,7 +24,6 @@ import java.util.Collections; import java.util.List; import com.google.common.collect.Lists; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; @@ -43,12 +42,19 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.DriverFactory; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hive.streaming.HiveStreamingConnection; import org.apache.hive.streaming.StreamingConnection; import org.apache.hive.streaming.StrictDelimitedInputWriter; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver; @@ -60,11 +66,75 @@ import static org.mockito.Mockito.mock; public class TestCrudCompactorOnTez extends CompactorOnTezTest { @Test - public void testMajorCompaction() throws Exception { + public void testMajorCompactionNotPartitionedWithoutBuckets() throws Exception { + String dbName = "default"; + String tblName = "testMajorCompaction"; + TestDataProvider testDataProvider = new TestDataProvider(); + testDataProvider.createFullAcidTable(tblName, false, false); + testDataProvider.insertTestData(tblName); + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + FileSystem fs = FileSystem.get(conf); + // Verify deltas (delta_0000001_0000001_0000, delta_0000002_0000002_0000) are present + Assert.assertEquals("Delta directories does not match before compaction", + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", + "delta_0000004_0000004_0000"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null)); + // Verify that delete delta (delete_delta_0000003_0000003_0000) is present + Assert.assertEquals("Delete directories does not match", + Arrays.asList("delete_delta_0000003_0000003_0000", "delete_delta_0000005_0000005_0000"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null)); + + List<String> expectedRsBucket0 = new ArrayList<>(Arrays.asList( + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t2\t3", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t2\t4", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t3\t4", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":4}\t4\t3", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":5}\t4\t4", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t2", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t5\t3", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":2}\t5\t4", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":3}\t6\t2", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":4}\t6\t3", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":5}\t6\t4")); + // Check bucket contents + Assert.assertEquals("pre-compaction bucket 0", expectedRsBucket0, + testDataProvider.getBucketData(tblName, "536870912")); + + // Run major compaction and cleaner + CompactorTestUtil.runCompaction(conf, dbName, tblName, CompactionType.MAJOR, true); + CompactorTestUtil.runCleaner(conf); + verifySuccessfulCompaction(1); + // Should contain only one base directory now + String expectedBase = "base_0000005_v0000009"; + Assert.assertEquals("Base directory does not match after major compaction", + Collections.singletonList(expectedBase), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null)); + // Check base dir contents + List<String> expectedBucketFiles = Arrays.asList("bucket_00000"); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil + .getBucketFileNames(fs, table, null, expectedBase)); + // Check bucket contents + Assert.assertEquals("post-compaction bucket 0", expectedRsBucket0, + testDataProvider.getBucketData(tblName, "536870912")); + // Check bucket file contents + checkBucketIdAndRowIdInAcidFile(fs, new Path(table.getSd().getLocation(), expectedBase), 0); + } + + /** + * TestDataProvider uses 2 buckets, I want to test 4 buckets here. + * @throws Exception + */ + @Test + public void testMajorCompactionNotPartitioned4Buckets() throws Exception { String dbName = "default"; String tblName = "testMajorCompaction"; executeStatementOnDriver("drop table if exists " + tblName, driver); - executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered by (a) into 2 buckets" + executeStatementOnDriver("create transactional table " + tblName + " (a int, b int) clustered" + + " by (a) into 4 buckets" + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true'," + " 'transactional_properties'='default')", driver); executeStatementOnDriver("insert into " + tblName + " values(1,2),(1,3),(1,4),(2,2),(2,3),(2,4)", driver); @@ -72,77 +142,270 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { executeStatementOnDriver("delete from " + tblName + " where b = 2", driver); // Find the location of the table IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); FileSystem fs = FileSystem.get(conf); // Verify deltas (delta_0000001_0000001_0000, delta_0000002_0000002_0000) are present - FileStatus[] filestatus = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.deltaFileFilter); - String[] deltas = new String[filestatus.length]; - for (int i = 0; i < deltas.length; i++) { - deltas[i] = filestatus[i].getPath().getName(); - } - Arrays.sort(deltas); - String[] expectedDeltas = new String[] { "delta_0000001_0000001_0000", "delta_0000002_0000002_0000" }; - if (!Arrays.deepEquals(expectedDeltas, deltas)) { - Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); - } + Assert.assertEquals("Delta directories does not match before compaction", + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null)); // Verify that delete delta (delete_delta_0000003_0000003_0000) is present - FileStatus[] deleteDeltaStat = fs.listStatus(new Path(table.getSd().getLocation()), - AcidUtils.deleteEventDeltaDirFilter); - String[] deleteDeltas = new String[deleteDeltaStat.length]; - for (int i = 0; i < deleteDeltas.length; i++) { - deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); - } - Arrays.sort(deleteDeltas); - String[] expectedDeleteDeltas = new String[] { "delete_delta_0000003_0000003_0000" }; - if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { - Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); + Assert.assertEquals("Delete directories does not match", + Arrays.asList("delete_delta_0000003_0000003_0000"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null)); + List<String> expectedRsBucket0 = new ArrayList<>(Arrays.asList( + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t4" + )); + List<String> expectedRsBucket1 = new ArrayList<>(Arrays.asList( + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t1\t4", + "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t3", + "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":2}\t4\t4" + )); + List<String> expectedRsBucket2 = new ArrayList<>(Arrays.asList( + "{\"writeid\":2,\"bucketid\":537001984,\"rowid\":1}\t3\t3", + "{\"writeid\":2,\"bucketid\":537001984,\"rowid\":2}\t3\t4" + )); + TestDataProvider testDataProvider = new TestDataProvider(); + List<String> preCompactionRsBucket0 = testDataProvider.getBucketData(tblName, "536870912"); + List<String> preCompactionRsBucket1 = testDataProvider.getBucketData(tblName, "536936448"); + List<String> preCompactionRsBucket2 = testDataProvider.getBucketData(tblName, "537001984"); + if (runsOnTez) { // Check bucket contents + Assert.assertEquals("pre-compaction bucket 0", expectedRsBucket0, preCompactionRsBucket0); + Assert.assertEquals("pre-compaction bucket 1", expectedRsBucket1, preCompactionRsBucket1); + Assert.assertEquals("pre-compaction bucket 2", expectedRsBucket2, preCompactionRsBucket2); + } else { + // MR sometimes inserts rows in the opposite order from Tez, so rowids won't match. so we + // just check whether the bucket contents change during compaction. + expectedRsBucket0 = preCompactionRsBucket0; + expectedRsBucket1 = preCompactionRsBucket1; + expectedRsBucket2 = preCompactionRsBucket2; } - List<String> expectedRsBucket0 = new ArrayList<>(); - expectedRsBucket0.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3"); - expectedRsBucket0.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t4"); - expectedRsBucket0.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3"); - expectedRsBucket0.add("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t3\t4"); - List<String> expectedRsBucket1 = new ArrayList<>(); - expectedRsBucket1.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3"); - expectedRsBucket1.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t1\t4"); - expectedRsBucket1.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t3"); - expectedRsBucket1.add("{\"writeid\":2,\"bucketid\":536936448,\"rowid\":2}\t4\t4"); - // Bucket 0 - List<String> rsBucket0 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName - + " where ROW__ID.bucketid = 536870912 order by ROW__ID", driver); - Assert.assertEquals("normal read", expectedRsBucket0, rsBucket0); - // Bucket 1 - List<String> rsBucket1 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName - + " where ROW__ID.bucketid = 536936448 order by ROW__ID", driver); - Assert.assertEquals("normal read", expectedRsBucket1, rsBucket1); + // Run major compaction and cleaner CompactorTestUtil.runCompaction(conf, dbName, tblName, CompactionType.MAJOR, true); CompactorTestUtil.runCleaner(conf); + verifySuccessfulCompaction(1); // Should contain only one base directory now - filestatus = fs.listStatus(new Path(table.getSd().getLocation())); - String[] bases = new String[filestatus.length]; - for (int i = 0; i < bases.length; i++) { - bases[i] = filestatus[i].getPath().getName(); + String expectedBase = "base_0000003_v0000009"; + Assert.assertEquals("Base directory does not match after major compaction", + Collections.singletonList(expectedBase), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null)); + // Check files in base + List<String> expectedBucketFiles = Arrays.asList("bucket_00000", "bucket_00001", "bucket_00002"); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil + .getBucketFileNames(fs, table, null, "base_0000003_v0000009")); + // Check buckets contents + Assert.assertEquals("post-compaction bucket 0", expectedRsBucket0, testDataProvider.getBucketData(tblName, + "536870912")); + Assert.assertEquals("post-compaction bucket 1", expectedRsBucket1, testDataProvider.getBucketData(tblName, + "536936448")); + Assert.assertEquals("post-compaction bucket 2", expectedRsBucket2, testDataProvider.getBucketData(tblName, + "537001984")); + // Check bucket file contents + checkBucketIdAndRowIdInAcidFile(fs, new Path(table.getSd().getLocation(), expectedBase), 0); + checkBucketIdAndRowIdInAcidFile(fs, new Path(table.getSd().getLocation(), expectedBase), 1); + checkBucketIdAndRowIdInAcidFile(fs, new Path(table.getSd().getLocation(), expectedBase), 2); + } + + @Test + public void testMajorCompactionPartitionedWithoutBuckets() throws Exception { + String dbName = "default"; + String tblName = "testMajorCompaction"; + TestDataProvider testDataProvider = new TestDataProvider(); + testDataProvider.createFullAcidTable(tblName, true, false); + testDataProvider.insertTestDataPartitioned(tblName); + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tblName); + String tablePath = table.getSd().getLocation(); + String partitionToday = "ds=today"; + String partitionTomorrow = "ds=tomorrow"; + String partitionYesterday = "ds=yesterday"; + Path todayPath = new Path(tablePath, partitionToday); + Path tomorrowPath = new Path(tablePath, partitionTomorrow); + Path yesterdayPath = new Path(tablePath, partitionYesterday); + FileSystem fs = FileSystem.get(conf); + // Verify deltas + Assert.assertEquals("Delta directories does not match", + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000004_0000004_0000"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday)); + // Verify delete delta + Assert.assertEquals("Delete directories does not match", + Arrays.asList("delete_delta_0000003_0000003_0000", "delete_delta_0000005_0000005_0000"), + CompactorTestUtil + .getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, partitionToday)); + + List<String> expectedRsBucket0 = new ArrayList<>(Arrays.asList( + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3\tyesterday", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t4\ttoday", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t3\ttoday", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4\tyesterday", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t4\t3\ttomorrow", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t4\t4\ttoday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t2\tyesterday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t5\t3\tyesterday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t4\ttoday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t6\t2\ttoday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":2}\t6\t3\ttoday", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":3}\t6\t4\ttoday")); + if (runsOnTez) { // Check bucket contents + Assert.assertEquals("pre-compaction bucket 0", expectedRsBucket0, + testDataProvider.getBucketData(tblName, "536870912")); + } else { + // MR sometimes inserts rows in the opposite order from Tez, so rowids won't match. so we + // just check whether the bucket contents change during compaction. + expectedRsBucket0 = testDataProvider.getBucketData(tblName, "536870912"); } - Arrays.sort(bases); - String[] expectedBases = new String[] { "base_0000003_v0000008" }; - if (!Arrays.deepEquals(expectedBases, bases)) { - Assert.fail("Expected: " + Arrays.toString(expectedBases) + ", found: " + Arrays.toString(bases)); + + // Run major compaction and cleaner for all 3 partitions + CompactorTestUtil.runCompaction(conf, dbName, tblName, CompactionType.MAJOR, true, + partitionToday, partitionTomorrow, partitionYesterday); + CompactorTestUtil.runCleaner(conf); + // 3 compaction should be in the response queue with succeeded state + verifySuccessfulCompaction( 3); + // Should contain only one base directory now + Assert.assertEquals("Base directory does not match after major compaction", + Collections.singletonList("base_0000005_v0000009"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, partitionToday)); + Assert.assertEquals("Base directory does not match after major compaction", + Collections.singletonList("base_0000005_v0000013"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, partitionTomorrow)); + Assert.assertEquals("Base directory does not match after major compaction", + Collections.singletonList("base_0000005_v0000017"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, partitionYesterday)); + // Check base dir contents + List<String> expectedBucketFiles = Arrays.asList("bucket_00000"); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil + .getBucketFileNames(fs, table, partitionToday, "base_0000005_v0000009")); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil + .getBucketFileNames(fs, table, partitionTomorrow, "base_0000005_v0000013")); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil + .getBucketFileNames(fs, table, partitionYesterday, "base_0000005_v0000017")); + // Check buckets contents + Assert.assertEquals("post-compaction bucket 0", expectedRsBucket0, + testDataProvider.getBucketData(tblName, "536870912")); + // Check bucket file contents + checkBucketIdAndRowIdInAcidFile(fs, new Path(todayPath, "base_0000005_v0000009"), 0); + checkBucketIdAndRowIdInAcidFile(fs, new Path(tomorrowPath, "base_0000005_v0000013"), 0); + checkBucketIdAndRowIdInAcidFile(fs, new Path(yesterdayPath, "base_0000005_v0000017"), 0); + } + + @Test public void testMajorCompactionPartitionedWithBuckets() throws Exception { + String dbName = "default"; + String tableName = "testMajorCompaction"; + // Create test table + TestDataProvider dataProvider = new TestDataProvider(); + dataProvider.createFullAcidTable(tableName, true, true); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + dataProvider.insertTestDataPartitioned(tableName); + // Get all data before compaction is run + List<String> expectedData = dataProvider.getAllData(tableName); + // Verify deltas + String partitionToday = "ds=today"; + String partitionTomorrow = "ds=tomorrow"; + String partitionYesterday = "ds=yesterday"; + Assert.assertEquals("Delta directories does not match", + Arrays.asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", "delta_0000004_0000004_0000"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday)); + // Verify delete delta + Assert.assertEquals("Delete directories does not match", + Arrays.asList("delete_delta_0000003_0000003_0000", "delete_delta_0000005_0000005_0000"), + CompactorTestUtil + .getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, partitionToday)); + // Check bucket contents + List<String> expectedRsBucket0 = Arrays.asList( + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t3\ttoday", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4\tyesterday"); + List<String> rsBucket0 = dataProvider.getBucketData(tableName, "536870912"); + + List<String> expectedRsBucket1 = Arrays.asList( + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t3\tyesterday", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4\ttoday", + "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t3\ttomorrow", + "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t4\ttoday", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t2\tyesterday", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3\tyesterday", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t4\ttoday", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t6\t2\ttoday", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":2}\t6\t3\ttoday", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":3}\t6\t4\ttoday"); + List<String> rsBucket1 = dataProvider.getBucketData(tableName, "536936448"); + if (runsOnTez) { + Assert.assertEquals(expectedRsBucket0, rsBucket0); + Assert.assertEquals(expectedRsBucket1, rsBucket1); + } else { + // MR sometimes inserts rows in the opposite order from Tez, so rowids won't match. so we + // just check whether the bucket contents change during compaction. + expectedRsBucket0 = rsBucket0; + expectedRsBucket1 = rsBucket1; } + + // Run a compaction + CompactorTestUtil + .runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true, partitionToday, + partitionTomorrow, + partitionYesterday); + // Clean up resources + CompactorTestUtil.runCleaner(conf); + // 3 compactions should be in the response queue with succeeded state + verifySuccessfulCompaction( 3); + // Verify base directories after compaction in each partition + String expectedBaseToday = "base_0000005_v0000011"; + String expectedBaseTomorrow = "base_0000005_v0000015"; + String expectedBaseYesterday = "base_0000005_v0000019"; + List<String> baseDeltasInToday = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, partitionToday); + Assert.assertEquals("Delta directories does not match after compaction", + Collections.singletonList(expectedBaseToday), baseDeltasInToday); + List<String> baseDeltasInTomorrow = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, partitionTomorrow); + Assert.assertEquals("Delta directories does not match after compaction", + Collections.singletonList(expectedBaseTomorrow), baseDeltasInTomorrow); + List<String> baseDeltasInYesterday = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, partitionYesterday); + Assert.assertEquals("Delta directories does not match after compaction", + Collections.singletonList(expectedBaseYesterday), baseDeltasInYesterday); + // Verify contents of bases + Assert.assertEquals("Bucket names are not matching after compaction", Arrays.asList("bucket_00000", "bucket_00001"), + CompactorTestUtil + .getBucketFileNames(fs, table, partitionToday, expectedBaseToday)); + Assert.assertEquals("Bucket names are not matching after compaction", Arrays.asList("bucket_00001"), + CompactorTestUtil + .getBucketFileNames(fs, table, partitionTomorrow, expectedBaseTomorrow)); + Assert.assertEquals("Bucket names are not matching after compaction", Arrays.asList("bucket_00000", "bucket_00001"), + CompactorTestUtil + .getBucketFileNames(fs, table, partitionYesterday, expectedBaseYesterday)); + // Verify contents of bucket files. // Bucket 0 - List<String> rsCompactBucket0 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName - + " where ROW__ID.bucketid = 536870912", driver); - Assert.assertEquals("compacted read", rsBucket0, rsCompactBucket0); + rsBucket0 = dataProvider.getBucketData(tableName, "536870912"); + Assert.assertEquals(expectedRsBucket0, rsBucket0); // Bucket 1 - List<String> rsCompactBucket1 = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + tblName - + " where ROW__ID.bucketid = 536936448", driver); - Assert.assertEquals("compacted read", rsBucket1, rsCompactBucket1); - // Clean up - executeStatementOnDriver("drop table " + tblName, driver); + rsBucket1 = dataProvider.getBucketData(tableName, "536936448"); + Assert.assertEquals(expectedRsBucket1, rsBucket1); + // Verify all contents + List<String> actualData = dataProvider.getAllData(tableName); + Assert.assertEquals(expectedData, actualData); + String tablePath = table.getSd().getLocation(); + checkBucketIdAndRowIdInAcidFile(fs, new Path(new Path(tablePath, partitionToday), expectedBaseToday), 0); + checkBucketIdAndRowIdInAcidFile(fs, new Path(new Path(tablePath, partitionToday), expectedBaseToday), 1); + checkBucketIdAndRowIdInAcidFile(fs, new Path(new Path(tablePath, partitionTomorrow), expectedBaseTomorrow), 1); + checkBucketIdAndRowIdInAcidFile(fs, new Path(new Path(tablePath, partitionYesterday), expectedBaseYesterday), 0); + checkBucketIdAndRowIdInAcidFile(fs, new Path(new Path(tablePath, partitionYesterday), expectedBaseYesterday), 1); } @Test public void testMinorCompactionNotPartitionedWithoutBuckets() throws Exception { + Assume.assumeTrue(runsOnTez); String dbName = "default"; String tableName = "testMinorCompaction"; // Create test table @@ -169,10 +432,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { // Clean up resources CompactorTestUtil.runCleaner(conf); // Only 1 compaction should be in the response queue with succeeded state - List<ShowCompactResponseElement> compacts = - TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); - Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size()); - Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState()); + verifySuccessfulCompaction(1); // Verify delta directories after compaction List<String> actualDeltasAfterComp = CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); @@ -190,7 +450,8 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeleteDeltasAfterComp.get(0))); // Verify contents of bucket files. // Bucket 0 - List<String> expectedRsBucket0 = Arrays.asList("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t2\t3", + List<String> expectedRsBucket0 = Arrays.asList( + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t2\t3", "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t2\t4", "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3", "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t3\t4", @@ -213,6 +474,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { @Test public void testMinorCompactionNotPartitionedWithBuckets() throws Exception { + Assume.assumeTrue(runsOnTez); String dbName = "default"; String tableName = "testMinorCompaction"; // Create test table @@ -239,10 +501,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { // Clean up resources CompactorTestUtil.runCleaner(conf); // Only 1 compaction should be in the response queue with succeeded state - List<ShowCompactResponseElement> compacts = - TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); - Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size()); - Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState()); + verifySuccessfulCompaction(1); // Verify delta directories after compaction List<String> actualDeltasAfterComp = CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); @@ -260,12 +519,14 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeleteDeltasAfterComp.get(0))); // Verify contents of bucket files. // Bucket 0 - List<String> expectedRsBucket0 = Arrays.asList("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3", + List<String> expectedRsBucket0 = Arrays.asList( + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t3", "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t3\t4"); List<String> rsBucket0 = dataProvider.getBucketData(tableName, "536870912"); Assert.assertEquals(expectedRsBucket0, rsBucket0); // Bucket 1 - List<String> expectedRs1Bucket = Arrays.asList("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t3", + List<String> expectedRs1Bucket = Arrays.asList( + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t3", "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t2\t4", "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t3", "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":2}\t4\t4", @@ -286,6 +547,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { @Test public void testMinorCompactionPartitionedWithoutBuckets() throws Exception { + Assume.assumeTrue(runsOnTez); String dbName = "default"; String tableName = "testMinorCompaction"; // Create test table @@ -318,10 +580,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { // Clean up resources CompactorTestUtil.runCleaner(conf); // 3 compaction should be in the response queue with succeeded state - List<ShowCompactResponseElement> compacts = - TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); - Assert.assertEquals("Completed compaction queue must contain 3 element", 3, compacts.size()); - compacts.forEach(c -> Assert.assertEquals("Compaction state is not succeeded", "succeeded", c.getState())); + verifySuccessfulCompaction(3); // Verify delta directories after compaction in each partition List<String> actualDeltasAfterCompPartToday = CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday); @@ -340,23 +599,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, CompactorTestUtil .getBucketFileNames(fs, table, partitionToday, actualDeleteDeltasAfterCompPartToday.get(0))); - // Verify contents of bucket files. - // Bucket 0 - List<String> expectedRsBucket0 = Arrays - .asList("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3\tyesterday", - "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t4\ttoday", - "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t3\ttoday", - "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4\tyesterday", - "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t4\t3\ttomorrow", - "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t4\t4\ttoday", - "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t4\ttoday", - "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t2\tyesterday", - "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t6\t2\ttoday", - "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t5\t3\tyesterday", - "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":2}\t6\t3\ttoday", - "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":3}\t6\t4\ttoday"); - List<String> rsBucket0 = dataProvider.getBucketData(tableName, "536870912"); - Assert.assertEquals(expectedRsBucket0, rsBucket0); + // Verify all contents List<String> actualData = dataProvider.getAllData(tableName); Assert.assertEquals(expectedData, actualData); @@ -366,6 +609,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { @Test public void testMinorCompactionPartitionedWithBuckets() throws Exception { + Assume.assumeTrue(runsOnTez); String dbName = "default"; String tableName = "testMinorCompaction"; // Create test table @@ -397,11 +641,8 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { partitionYesterday); // Clean up resources CompactorTestUtil.runCleaner(conf); - // Only 1 compaction should be in the response queue with succeeded state - List<ShowCompactResponseElement> compacts = - TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); - Assert.assertEquals("Completed compaction queue must contain 3 element", 3, compacts.size()); - compacts.forEach(c -> Assert.assertEquals("Compaction state is not succeeded", "succeeded", c.getState())); + // 3 compactions should be in the response queue with succeeded state + verifySuccessfulCompaction( 3); // Verify delta directories after compaction in each partition List<String> actualDeltasAfterCompPartToday = CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday); @@ -422,19 +663,21 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { .getBucketFileNames(fs, table, partitionToday, actualDeleteDeltasAfterCompPartToday.get(0))); // Verify contents of bucket files. // Bucket 0 - List<String> expectedRsBucket0 = Arrays.asList("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t3\ttoday", + List<String> expectedRsBucket0 = Arrays.asList( + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t3\ttoday", "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4\tyesterday"); List<String> rsBucket0 = dataProvider.getBucketData(tableName, "536870912"); Assert.assertEquals(expectedRsBucket0, rsBucket0); // Bucket 1 - List<String> expectedRsBucket1 = Arrays.asList("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4\ttoday", + List<String> expectedRsBucket1 = Arrays.asList( "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t3\tyesterday", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4\ttoday", "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t3\ttomorrow", "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t4\ttoday", - "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t4\ttoday", "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t2\tyesterday", - "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t6\t2\ttoday", "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3\tyesterday", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t4\ttoday", + "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t6\t2\ttoday", "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":2}\t6\t3\ttoday", "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":3}\t6\t4\ttoday"); List<String> rsBucket1 = dataProvider.getBucketData(tableName, "536936448"); @@ -448,6 +691,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { @Test public void testMinorCompaction10DeltaDirs() throws Exception { + Assume.assumeTrue(runsOnTez); String dbName = "default"; String tableName = "testMinorCompaction"; // Create test table @@ -473,10 +717,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); // Clean up resources CompactorTestUtil.runCleaner(conf); - List<ShowCompactResponseElement> compacts = - TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); - Assert.assertEquals("Completed compaction queue must contain 3 element", 1, compacts.size()); - compacts.forEach(c -> Assert.assertEquals("Compaction state is not succeeded", "succeeded", c.getState())); + verifySuccessfulCompaction( 1); // Verify delta directories after compaction List<String> actualDeltasAfterComp = CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); @@ -503,6 +744,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { @Test public void testMultipleMinorCompactions() throws Exception { + Assume.assumeTrue(runsOnTez); String dbName = "default"; String tableName = "testMinorCompaction"; // Create test table @@ -519,20 +761,15 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { // Clean up resources CompactorTestUtil.runCleaner(conf); // Only 1 compaction should be in the response queue with succeeded state - List<ShowCompactResponseElement> compacts = - TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); - Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size()); - Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState()); + verifySuccessfulCompaction(1); // Insert test data into test table dataProvider.insertTestData(tableName); // Run a compaction CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); // Clean up resources CompactorTestUtil.runCleaner(conf); - // 2 compaction should be in the response queue with succeeded state - compacts = TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); - Assert.assertEquals("Completed compaction queue must contain one element", 2, compacts.size()); - Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(1).getState()); + // 2 compactions should be in the response queue with succeeded state + verifySuccessfulCompaction(2); // Insert test data into test table dataProvider.insertTestData(tableName); // Run a compaction @@ -540,9 +777,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { // Clean up resources CompactorTestUtil.runCleaner(conf); // 3 compaction should be in the response queue with succeeded state - compacts = TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); - Assert.assertEquals("Completed compaction queue must contain one element", 3, compacts.size()); - Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(2).getState()); + verifySuccessfulCompaction(3); // Verify delta directories after compaction List<String> actualDeltasAfterComp = CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); @@ -557,6 +792,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { @Test public void testMinorCompactionWhileStreaming() throws Exception { + Assume.assumeTrue(runsOnTez); String dbName = "default"; String tableName = "testMinorCompaction"; executeStatementOnDriver("drop table if exists " + tableName, driver); @@ -595,6 +831,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { @Test public void testMinorCompactionWhileStreamingAfterAbort() throws Exception { + Assume.assumeTrue(runsOnTez); String dbName = "default"; String tableName = "testMinorCompaction"; executeStatementOnDriver("drop table if exists " + tableName, driver); @@ -620,6 +857,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { @Test public void testMinorCompactionWhileStreamingWithAbort() throws Exception { + Assume.assumeTrue(runsOnTez); String dbName = "default"; String tableName = "testMinorCompaction"; executeStatementOnDriver("drop table if exists " + tableName, driver); @@ -646,6 +884,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { @Test public void testMinorCompactionWhileStreamingWithAbortInMiddle() throws Exception { + Assume.assumeTrue(runsOnTez); String dbName = "default"; String tableName = "testMinorCompaction"; executeStatementOnDriver("drop table if exists " + tableName, driver); @@ -683,6 +922,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { @Test public void testMajorCompactionAfterMinor() throws Exception { + Assume.assumeTrue(runsOnTez); String dbName = "default"; String tableName = "testMinorCompaction"; // Create test table @@ -702,10 +942,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { // Clean up resources CompactorTestUtil.runCleaner(conf); // Only 1 compaction should be in the response queue with succeeded state - List<ShowCompactResponseElement> compacts = - TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); - Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size()); - Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState()); + verifySuccessfulCompaction(1); // Verify delta directories after compaction Assert.assertEquals("Delta directories does not match after minor compaction", Collections.singletonList("delta_0000001_0000005_v0000009"), @@ -725,9 +962,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { // Clean up resources CompactorTestUtil.runCleaner(conf); // 2 compaction should be in the response queue with succeeded state - compacts = TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); - Assert.assertEquals("Completed compaction queue must contain one element", 2, compacts.size()); - Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(1).getState()); + verifySuccessfulCompaction(2); // Verify base directory after compaction Assert.assertEquals("Base directory does not match after major compaction", Collections.singletonList("base_0000010_v0000029"), @@ -739,6 +974,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { @Test public void testMinorCompactionAfterMajor() throws Exception { + Assume.assumeTrue(runsOnTez); String dbName = "default"; String tableName = "testMinorCompaction"; // Create test table @@ -758,10 +994,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { // Clean up resources CompactorTestUtil.runCleaner(conf); // Only 1 compaction should be in the response queue with succeeded state - List<ShowCompactResponseElement> compacts = - TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); - Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size()); - Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState()); + verifySuccessfulCompaction(1); // Verify base directory after compaction Assert.assertEquals("Base directory does not match after major compaction", Collections.singletonList("base_0000005_v0000009"), @@ -778,9 +1011,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { // Clean up resources CompactorTestUtil.runCleaner(conf); // 2 compaction should be in the response queue with succeeded state - compacts = TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); - Assert.assertEquals("Completed compaction queue must contain one element", 2, compacts.size()); - Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(1).getState()); + verifySuccessfulCompaction(2); // Verify base directory after compaction Assert.assertEquals("Base directory does not match after major compaction", Collections.singletonList("base_0000005_v0000009"), @@ -795,6 +1026,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { @Test public void testMinorCompactionWhileStreamingWithSplitUpdate() throws Exception { + Assume.assumeTrue(runsOnTez); String dbName = "default"; String tableName = "testMinorCompaction"; executeStatementOnDriver("drop table if exists " + tableName, driver); @@ -851,10 +1083,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { + " partition (ds) values(3,2,1000,'yesterday'),(3,3,1001,'today'),(3,4,1002,'yesterday'),(4,2,1003,'today')," + "(4,3,1004,'yesterday'),(4,4,1005,'today')", driver); executeStatementOnDriver("delete from " + tblName + " where b = 2", driver); - // Run major compaction and cleaner - CompactorTestUtil - .runCompaction(conf, dbName, tblName, CompactionType.MAJOR, true, "ds=yesterday", "ds=today"); - CompactorTestUtil.runCleaner(conf); + List<String> expectedRsBucket0PtnToday = new ArrayList<>(); expectedRsBucket0PtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t3\tNULL\ttoday"); expectedRsBucket0PtnToday.add("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t4\tNULL\ttoday"); @@ -863,12 +1092,33 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { expectedRsBucket1PtnToday.add("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t1\t3\tNULL\ttoday"); expectedRsBucket1PtnToday.add("{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t4\t4\t1005\ttoday"); // Bucket 0, partition 'today' + List<String> rsBucket0PtnToday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + + tblName + " where ROW__ID.bucketid = 536870912 and ds='today' order by a,b", driver); + // Bucket 1, partition 'today' + List<String> rsBucket1PtnToday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " + + tblName + " where ROW__ID.bucketid = 536936448 and ds='today' order by a,b", driver); + if (runsOnTez) { + Assert.assertEquals("pre-compaction read", expectedRsBucket0PtnToday, rsBucket0PtnToday); + Assert.assertEquals("pre-compaction read", expectedRsBucket1PtnToday, rsBucket1PtnToday); + } else { + // MR sometimes inserts rows in the opposite order from Tez, so rowids won't match. so we + // just check whether the bucket contents change during compaction. + expectedRsBucket0PtnToday = rsBucket0PtnToday; + expectedRsBucket1PtnToday = rsBucket1PtnToday; + } + + // Run major compaction and cleaner + CompactorTestUtil + .runCompaction(conf, dbName, tblName, CompactionType.MAJOR, true, "ds=yesterday", "ds=today"); + CompactorTestUtil.runCleaner(conf); + + // Bucket 0, partition 'today' List<String> rsCompactBucket0PtnToday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " - + tblName + " where ROW__ID.bucketid = 536870912 and ds='today'", driver); + + tblName + " where ROW__ID.bucketid = 536870912 and ds='today' order by a,b", driver); Assert.assertEquals("compacted read", expectedRsBucket0PtnToday, rsCompactBucket0PtnToday); // Bucket 1, partition 'today' List<String> rsCompactBucket1PtnToday = executeStatementOnDriverAndReturnResults("select ROW__ID, * from " - + tblName + " where ROW__ID.bucketid = 536936448 and ds='today'", driver); + + tblName + " where ROW__ID.bucketid = 536936448 and ds='today' order by a,b", driver); Assert.assertEquals("compacted read", expectedRsBucket1PtnToday, rsCompactBucket1PtnToday); // Clean up executeStatementOnDriver("drop table " + tblName, driver); @@ -953,7 +1203,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { // Run a compaction CompactorTestUtil.runCompaction(conf, dbName, tableName, compactionType, true); CompactorTestUtil.runCleaner(conf); - verifySuccessulTxn(1); + verifySuccessfulCompaction(1); // Verify directories after compaction PathFilter pathFilter = compactionType == CompactionType.MAJOR ? AcidUtils.baseFileFilter : AcidUtils.deltaFileFilter; @@ -973,19 +1223,70 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { /** * Verify that the expected number of transactions have run, and their state is "succeeded". * - * @param expectedCompleteCompacts number of compactions already run + * @param expectedSuccessfulCompactions number of compactions already run * @throws MetaException */ - private void verifySuccessulTxn(int expectedCompleteCompacts) throws MetaException { + private void verifySuccessfulCompaction(int expectedSuccessfulCompactions) throws MetaException { List<ShowCompactResponseElement> compacts = TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); - Assert.assertEquals("Completed compaction queue must contain one element", - expectedCompleteCompacts, compacts.size()); + Assert.assertEquals("Completed compaction queue must contain " + expectedSuccessfulCompactions + " element(s)", + expectedSuccessfulCompactions, compacts.size()); compacts.forEach( c -> Assert.assertEquals("Compaction state is not succeeded", "succeeded", c.getState())); } /** + * Read file, and + * 1. make sure that the bucket property in each row matches the file name. + * For example, if the bucketId is 0, we check file bucket_00000 to make sure that the third + * column contains only the value 536870912. + * 2. make sure that rowIds are in ascending order + * @param fs file system + * @param path where to look for the bucket file + * @param bucketId bucket Id to check, e.g. 0. + */ + private void checkBucketIdAndRowIdInAcidFile(FileSystem fs, Path path, int bucketId) throws IOException { + Path bucketFilePath = AcidUtils.createBucketFile(path, bucketId); + Reader orcReader = OrcFile.createReader(bucketFilePath, + OrcFile.readerOptions(fs.getConf()).filesystem(fs)); + TypeDescription schema = orcReader.getSchema(); + try (RecordReader rows = orcReader.rows()) { + VectorizedRowBatch batch = schema.createRowBatch(); + rows.nextBatch(batch); + // check that bucket property in each row matches the bucket in the file name + long[] bucketIdVector = ((LongColumnVector) batch.cols[2]).vector; + for (int i = 0; i < batch.count(); i++) { + Assert.assertEquals(bucketId, decodeBucketProperty(bucketIdVector[i])); + } + // check that writeIds, then rowIds are sorted in ascending order + long[] writeIdVector = ((LongColumnVector) batch.cols[1]).vector; + long[] rowIdVector = ((LongColumnVector) batch.cols[3]).vector; + long writeId = writeIdVector[0]; + long rowId = 0; + for (int i = 0; i < batch.count(); i++) { + long currentWriteId = writeIdVector[i]; + long currentRowId = rowIdVector[i]; + if (writeId == writeIdVector[i]) { + Assert.assertTrue(rowId <= currentRowId); + rowId = currentRowId; + } else { + Assert.assertTrue(writeId < currentWriteId); + writeId = currentWriteId; + rowId = 0; + } + } + } + } + + /** + * Couldn't find any way to get the bucket property from BucketCodec, so just reverse + * engineered the encoding. The actual bucketId is represented by bits 2-11 of 29 bits + */ + private int decodeBucketProperty(long bucketCodec) { + return (int) ((bucketCodec >> 16) & (0xFFF)); + } + + /** * Tests whether hive.llap.io.etl.skip.format config is handled properly whenever QueryCompactor#runCompactionQueries * is invoked. * @throws Exception diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnMr.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnMr.java new file mode 100644 index 0000000..7ec23f7 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnMr.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.Before; + +public class TestMmCompactorOnMr extends TestMmCompactorOnTez { + @Before + public void setMr() { + driver.getConf().setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr"); + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java index 451390a..9772bbe 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java @@ -45,7 +45,7 @@ import java.util.List; import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver; /** - * Test functionality of MmMinorQueryCompactor,. + * Test functionality of MmMinorQueryCompactor. */ public class TestMmCompactorOnTez extends CompactorOnTezTest { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java index d8f8e72..87e6761 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.FileMergeDesc; @@ -116,6 +117,10 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc> taskTmpPath = null; // Make sure we don't collide with the source. outPath = finalPath = new Path(tmpPath, taskId + ".merged"); + } else if (conf.getIsCompactionTable()) { + taskTmpPath = ttp; // _task_tmp + finalPath = tp; // _tmp + outPath = ttp; // also _task_tmp } else { taskTmpPath = ttp; finalPath = new Path(tp, taskId); @@ -373,6 +378,12 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc> return outPath; } + protected final Path getOutPath(int bucketId) { + String fileName = AcidUtils.BUCKET_PREFIX + String.format(AcidUtils.BUCKET_DIGITS, bucketId); + Path out = new Path(outPath, fileName); + return out; + } + protected final void addIncompatibleFile(Path path) { incompatFileSet.add(path); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 22a24f8..023f7df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -334,6 +334,18 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements if (!isMmTable && !isDirectInsert) { if (!bDynParts && !isSkewedStoredAsSubDirectories) { finalPaths[filesIdx] = new Path(parent, taskWithExt); + if (conf.isCompactionTable()) { + // Helper tables used for compaction are external and non-acid. We need to keep + // track of the taskId to avoid overwrites in the case of multiple + // FileSinkOperators, and the file names need to reflect the correct bucketId + // because the files will eventually be placed in an acid table, and the + // OrcFileMergeOperator should not merge data belonging to different buckets. + // Therefore during compaction, data will be stored in the final directory like: + // ${hive_staging_dir}/final_dir/taskid/bucketId + // For example, ${hive_staging dir}/-ext-10002/000000_0/bucket_00000 + finalPaths[filesIdx] = new Path(finalPaths[filesIdx], + AcidUtils.BUCKET_PREFIX + String.format(AcidUtils.BUCKET_DIGITS, bucketId)); + } } else { finalPaths[filesIdx] = new Path(buildTmpPath(), taskWithExt); } @@ -801,12 +813,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) throws HiveException { try { - if (conf.isCompactionTable()) { - fsp.initializeBucketPaths(filesIdx, AcidUtils.BUCKET_PREFIX + String.format(AcidUtils.BUCKET_DIGITS, bucketId), - isNativeTable(), isSkewedStoredAsSubDirectories); - } else { - fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), isSkewedStoredAsSubDirectories); - } + fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), isSkewedStoredAsSubDirectories); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("createBucketForFileIdx " + filesIdx + ": final path " + fsp.finalPaths[filesIdx] + "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath + ", tmp path " diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java index b53205a..5ca7a04 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.hive.ql.CompilationOpContext; @@ -28,6 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.orc.CompressionKind; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcFileKeyWrapper; import org.apache.hadoop.hive.ql.io.orc.OrcFileValueWrapper; @@ -54,7 +57,7 @@ public class OrcFileMergeOperator extends private TypeDescription fileSchema; private int rowIndexStride = 0; - private Writer outWriter; + private Map<Integer, Writer> outWriters = new HashMap<>(); private Path prevPath; private Reader reader; private FSDataInputStream fdis; @@ -113,7 +116,11 @@ public class OrcFileMergeOperator extends // store the orc configuration from the first file. All other files should // match this configuration before merging else will not be merged - if (outWriter == null) { + int bucketId = 0; + if (conf.getIsCompactionTable()) { + bucketId = AcidUtils.parseBucketId(new Path(filePath)); + } + if (outWriters.get(bucketId) == null) { compression = k.getCompression(); compressBuffSize = k.getCompressBufferSize(); fileVersion = k.getFileVersion(); @@ -134,7 +141,10 @@ public class OrcFileMergeOperator extends } Path outPath = getOutPath(); - outWriter = OrcFile.createWriter(outPath, options); + if (conf.getIsCompactionTable()) { + outPath = getOutPath(bucketId); + } + outWriters.put(bucketId, OrcFile.createWriter(outPath, options)); if (LOG.isDebugEnabled()) { LOG.info("ORC merge file output path: " + outPath); } @@ -157,7 +167,7 @@ public class OrcFileMergeOperator extends (int) v.getStripeInformation().getLength()); // append the stripe buffer to the new ORC file - outWriter.appendStripe(buffer, 0, buffer.length, v.getStripeInformation(), + outWriters.get(bucketId).appendStripe(buffer, 0, buffer.length, v.getStripeInformation(), v.getStripeStatistics()); if (LOG.isInfoEnabled()) { @@ -169,7 +179,7 @@ public class OrcFileMergeOperator extends // add user metadata to footer in case of any if (v.isLastStripeInFile()) { - outWriter.appendUserMetadata(v.getUserMetadata()); + outWriters.get(bucketId).appendUserMetadata(v.getUserMetadata()); } } catch (Throwable e) { exception = true; @@ -252,9 +262,12 @@ public class OrcFileMergeOperator extends fdis = null; } - if (outWriter != null) { - outWriter.close(); - outWriter = null; + if (outWriters != null) { + for (Map.Entry<Integer, Writer> outWriterEntry : outWriters.entrySet()) { + Writer outWriter = outWriterEntry.getValue(); + outWriter.close(); + outWriter = null; + } } } catch (Exception e) { throw new HiveException("Unable to close OrcFileMergeOperator", e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index a2dbeeb..044ce5c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2372,7 +2372,7 @@ public class Hive { FileSystem fs = destPath.getFileSystem(conf); copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, (loadFileType == LoadFileType.OVERWRITE_EXISTING), newFiles, - tbl.getNumBuckets() > 0, isFullAcidTable, isManaged); + tbl.getNumBuckets() > 0, isFullAcidTable, isManaged, false); } } perfLogger.PerfLogEnd("MoveTask", PerfLogger.FILE_MOVES); @@ -3074,6 +3074,7 @@ private void constructOneLBLocationMap(FileStatus fSta, boolean isTxnTable = AcidUtils.isTransactionalTable(tbl); boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl); boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl); + boolean isCompactionTable = AcidUtils.isCompactionTable(tbl.getParameters()); if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) { newFiles = Collections.synchronizedList(new ArrayList<Path>()); @@ -3129,7 +3130,7 @@ private void constructOneLBLocationMap(FileStatus fSta, FileSystem fs = tbl.getDataLocation().getFileSystem(conf); copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, loadFileType == LoadFileType.OVERWRITE_EXISTING, newFiles, - tbl.getNumBuckets() > 0, isFullAcidTable, isManaged); + tbl.getNumBuckets() > 0, isFullAcidTable, isManaged, isCompactionTable); } catch (IOException e) { throw new HiveException("addFiles: filesystem error in check phase", e); } @@ -4089,7 +4090,8 @@ private void constructOneLBLocationMap(FileStatus fSta, private static void copyFiles(final HiveConf conf, final FileSystem destFs, FileStatus[] srcs, final FileSystem srcFs, final Path destf, final boolean isSrcLocal, boolean isOverwrite, - final List<Path> newFiles, boolean acidRename, boolean isManaged) throws HiveException { + final List<Path> newFiles, boolean acidRename, boolean isManaged, + boolean isCompactionTable) throws HiveException { final HdfsUtils.HadoopFileStatus fullDestStatus; try { @@ -4112,8 +4114,8 @@ private void constructOneLBLocationMap(FileStatus fSta, // Sort the files Arrays.sort(srcs); String configuredOwner = HiveConf.getVar(conf, ConfVars.HIVE_LOAD_DATA_OWNER); + FileStatus[] files; for (FileStatus src : srcs) { - FileStatus[] files; if (src.isDirectory()) { try { files = srcFs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); @@ -4127,6 +4129,30 @@ private void constructOneLBLocationMap(FileStatus fSta, files = new FileStatus[] {src}; } + if (isCompactionTable) { + // Helper tables used for query-based compaction have a special file structure after + // filesink: tmpdir/attemptid/bucketid. + // We don't care about the attemptId anymore and don't want it in the table's final + // structure so just move the bucket files. + try { + List<FileStatus> fileStatuses = new ArrayList<>(); + for (FileStatus file : files) { + if (file.isDirectory() && AcidUtils.originalBucketFilter.accept(file.getPath())) { + FileStatus[] taskDir = srcFs.listStatus(file.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); + fileStatuses.addAll(Arrays.asList(taskDir)); + } else { + fileStatuses.add(file); + } + } + files = fileStatuses.toArray(new FileStatus[files.length]); + } catch (IOException e) { + if (null != pool) { + pool.shutdownNow(); + } + throw new HiveException(e); + } + } + final SessionState parentSession = SessionState.get(); // Sort the files Arrays.sort(files); @@ -4645,12 +4671,12 @@ private void constructOneLBLocationMap(FileStatus fSta, * @param newFiles if this is non-null, a list of files that were created as a result of this * move will be returned. * @param isManaged if table is managed. + * @param isCompactionTable is table used in query-based compaction * @throws HiveException */ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, FileSystem fs, - boolean isSrcLocal, boolean isAcidIUD, - boolean isOverwrite, List<Path> newFiles, boolean isBucketed, - boolean isFullAcidTable, boolean isManaged) throws HiveException { + boolean isSrcLocal, boolean isAcidIUD, boolean isOverwrite, List<Path> newFiles, boolean isBucketed, + boolean isFullAcidTable, boolean isManaged, boolean isCompactionTable) throws HiveException { try { // create the destination if it does not exist if (!fs.exists(destf)) { @@ -4686,7 +4712,7 @@ private void constructOneLBLocationMap(FileStatus fSta, // i.e, like 000000_0, 000001_0_copy_1, 000002_0.gz etc. // The extension is only maintained for files which are compressed. copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, isOverwrite, - newFiles, isFullAcidTable && !isBucketed, isManaged); + newFiles, isFullAcidTable && !isBucketed, isManaged, isCompactionTable); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 1ea3bd3..b46d2ec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat; @@ -1671,6 +1672,8 @@ public final class GenMapRedUtils { fmd = new OrcFileMergeDesc(); } fmd.setIsMmTable(fsInputDesc.isMmTable()); + boolean isCompactionTable = AcidUtils.isCompactionTable(tblDesc.getProperties()); + fmd.setIsCompactionTable(isCompactionTable); fmd.setWriteId(fsInputDesc.getTableWriteId()); int stmtId = fsInputDesc.getStatementId(); fmd.setStmtId(stmtId == -1 ? 0 : stmtId); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java index 6bd0053..0275da5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java @@ -31,6 +31,7 @@ public class FileMergeDesc extends AbstractOperatorDesc { private Long writeId; private int stmtId; private boolean isMmTable; + private boolean isCompactionTable; public FileMergeDesc(DynamicPartitionCtx dynPartCtx, Path outputDir) { this.dpCtx = dynPartCtx; @@ -100,4 +101,12 @@ public class FileMergeDesc extends AbstractOperatorDesc { public void setIsMmTable(boolean isMmTable) { this.isMmTable = isMmTable; } + + public boolean getIsCompactionTable() { + return isCompactionTable; + } + + public void setIsCompactionTable(boolean isCompactionTable) { + this.isCompactionTable = isCompactionTable; + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java index a0c23b6..f9c10f5 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java @@ -83,7 +83,8 @@ public class TestHiveCopyFiles { FileSystem targetFs = targetPath.getFileSystem(hiveConf); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false,null, false, false, false); + Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false, + null, false, false, false, false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -107,7 +108,8 @@ public class TestHiveCopyFiles { FileSystem targetFs = targetPath.getFileSystem(hiveConf); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false, null, false, false, false); + Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false, null, + false, false, false, false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -127,7 +129,8 @@ public class TestHiveCopyFiles { sourceFolder.newFile("000001_0.gz"); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false, null, false, false, false); + Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, isSourceLocal, NO_ACID, false, null, + false, false, false, false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -158,7 +161,8 @@ public class TestHiveCopyFiles { Mockito.when(spyTargetFs.getUri()).thenReturn(URI.create("hdfs://" + targetPath.toUri().getPath())); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, false, false, false); + Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, false, false, false, + false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -185,7 +189,8 @@ public class TestHiveCopyFiles { Mockito.when(spyTargetFs.getUri()).thenReturn(URI.create("hdfs://" + targetPath.toUri().getPath())); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, false, false, false); + Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, + false, false, false, false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false); @@ -205,7 +210,8 @@ public class TestHiveCopyFiles { sourceFolder.newFile("000001_0.gz"); try { - Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, false, false, false); + Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, isSourceLocal, NO_ACID, false, null, + false, false, false, false); } catch (HiveException e) { e.printStackTrace(); assertTrue("Hive.copyFiles() threw an unexpected exception.", false);