This is an automated email from the ASF dual-hosted git repository. lpinter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 9251663 HIVE-22610: Minor compaction for MM (insert-only) tables (Karen Coppage, reviewed by Laszlo Pinter) 9251663 is described below commit 92516631ab39f39df5d0692f98ac32c2cd320997 Author: Karen Coppage <karen.copp...@cloudera.com> AuthorDate: Mon Feb 17 14:29:02 2020 +0100 HIVE-22610: Minor compaction for MM (insert-only) tables (Karen Coppage, reviewed by Laszlo Pinter) --- .../hive/ql/txn/compactor/CompactorOnTezTest.java | 240 +++++++++ .../ql/txn/compactor/TestCrudCompactorOnTez.java | 216 +++----- .../ql/txn/compactor/TestMmCompactorOnTez.java | 564 +++++++++++++++++++++ .../hadoop/hive/ql/txn/compactor/CompactorMR.java | 15 - .../ql/txn/compactor/MmMajorQueryCompactor.java | 151 +----- .../ql/txn/compactor/MmMinorQueryCompactor.java | 211 ++++++++ .../ql/txn/compactor/MmQueryCompactorUtils.java | 200 ++++++++ .../ql/txn/compactor/QueryCompactorFactory.java | 6 +- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 37 -- .../hadoop/hive/ql/TestTxnCommandsForMmTable.java | 126 ----- 10 files changed, 1295 insertions(+), 471 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java new file mode 100644 index 0000000..78174f3 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.DriverFactory; +import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.After; +import org.junit.Before; + +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtil.executeStatementOnDriverAndReturnResults; +import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver; + +/** + * Superclass for Test[Crud|Mm]CompactorOnTez, for setup and helper classes. + */ +public class CompactorOnTezTest { + private static final AtomicInteger RANDOM_INT = new AtomicInteger(new Random().nextInt()); + private static final String TEST_DATA_DIR = new File( + System.getProperty("java.io.tmpdir") + File.separator + TestCrudCompactorOnTez.class + .getCanonicalName() + "-" + System.currentTimeMillis() + "_" + RANDOM_INT + .getAndIncrement()).getPath().replaceAll("\\\\", "/"); + private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; + protected HiveConf conf; + protected IMetaStoreClient msClient; + protected IDriver driver; + + @Before + // Note: we create a new conf and driver object before every test + public void setup() throws Exception { + File f = new File(TEST_WAREHOUSE_DIR); + if (f.exists()) { + FileUtil.fullyDelete(f); + } + if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) { + throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR); + } + HiveConf hiveConf = new HiveConf(this.getClass()); + hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, ""); + hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, ""); + hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR); + hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); + hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + TxnDbUtil.setConfValues(hiveConf); + TxnDbUtil.cleanDb(hiveConf); + TxnDbUtil.prepDb(hiveConf); + conf = hiveConf; + // Use tez as execution engine for this test class + setupTez(conf); + msClient = new HiveMetaStoreClient(conf); + driver = DriverFactory.newDriver(conf); + SessionState.start(new CliSessionState(conf)); + } + + private void setupTez(HiveConf conf) { + conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez"); + conf.setVar(HiveConf.ConfVars.HIVE_USER_INSTALL_DIR, TEST_DATA_DIR); + conf.set("tez.am.resource.memory.mb", "128"); + conf.set("tez.am.dag.scheduler.class", + "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled"); + conf.setBoolean("tez.local.mode", true); + conf.set("fs.defaultFS", "file:///"); + conf.setBoolean("tez.runtime.optimize.local.fetch", true); + conf.set("tez.staging-dir", TEST_DATA_DIR); + conf.setBoolean("tez.ignore.lib.uris", true); + conf.set("hive.tez.container.size", "128"); + conf.setBoolean("hive.merge.tezfiles", false); + conf.setBoolean("hive.in.tez.test", true); + } + + @After public void tearDown() { + if (msClient != null) { + msClient.close(); + } + if (driver != null) { + driver.close(); + } + conf = null; + } + + protected class TestDataProvider { + + void createFullAcidTable(String tblName, boolean isPartitioned, boolean isBucketed) + throws Exception { + createTable(tblName, isPartitioned, isBucketed, false, "orc"); + } + + void createMmTable(String tblName, boolean isPartitioned, boolean isBucketed) + throws Exception { + createMmTable(tblName, isPartitioned, isBucketed, "orc"); + } + + void createMmTable(String tblName, boolean isPartitioned, boolean isBucketed, String fileFormat) + throws Exception { + createTable(tblName, isPartitioned, isBucketed, true, fileFormat); + } + + private void createTable(String tblName, boolean isPartitioned, boolean isBucketed, + boolean insertOnly, String fileFormat) throws Exception { + + executeStatementOnDriver("drop table if exists " + tblName, driver); + StringBuilder query = new StringBuilder(); + query.append("create table ").append(tblName).append(" (a string, b int)"); + if (isPartitioned) { + query.append(" partitioned by (ds string)"); + } + if (isBucketed) { + query.append(" clustered by (a) into 2 buckets"); + } + query.append(" stored as ").append(fileFormat); + query.append(" TBLPROPERTIES('transactional'='true',"); + if (insertOnly) { + query.append(" 'transactional_properties'='insert_only')"); + } else { + query.append(" 'transactional_properties'='default')"); + } + executeStatementOnDriver(query.toString(), driver); + } + + /** + * 5 txns. + */ + void insertTestDataPartitioned(String tblName) throws Exception { + executeStatementOnDriver("insert into " + tblName + + " values('1',2, 'today'),('1',3, 'today'),('1',4, 'yesterday'),('2',2, 'tomorrow')," + + "('2',3, 'yesterday'),('2',4, 'today')", driver); + executeStatementOnDriver("insert into " + tblName + + " values('3',2, 'tomorrow'),('3',3, 'today'),('3',4, 'yesterday'),('4',2, 'today')," + + "('4',3, 'tomorrow'),('4',4, 'today')", driver); + executeStatementOnDriver("delete from " + tblName + " where b = 2", driver); + executeStatementOnDriver("insert into " + tblName + " values('5',2, 'yesterday'),('5',3, 'yesterday')," + + "('5',4, 'today'),('6',2, 'today'),('6',3, 'today'),('6',4, 'today')", driver); + executeStatementOnDriver("delete from " + tblName + " where a = '1'", driver); + } + + /** + * 3 txns. + */ + protected void insertMmTestDataPartitioned(String tblName) throws Exception { + executeStatementOnDriver("insert into " + tblName + + " values('1',2, 'today'),('1',3, 'today'),('1',4, 'yesterday'),('2',2, 'tomorrow')," + + "('2',3, 'yesterday'),('2',4, 'today')", driver); + executeStatementOnDriver("insert into " + tblName + + " values('3',2, 'tomorrow'),('3',3, 'today'),('3',4, 'yesterday'),('4',2, 'today')," + + "('4',3, 'tomorrow'),('4',4, 'today')", driver); + executeStatementOnDriver("insert into " + tblName + " values('5',2, 'yesterday'),('5',3, 'yesterday')," + + "('5',4, 'today'),('6',2, 'today'),('6',3, 'today'),('6',4, 'today')", driver); + } + + /** + * 5 txns. + */ + protected void insertTestData(String tblName) throws Exception { + executeStatementOnDriver("insert into " + tblName + " values('1',2),('1',3),('1',4),('2',2),('2',3),('2',4)", + driver); + executeStatementOnDriver("insert into " + tblName + " values('3',2),('3',3),('3',4),('4',2),('4',3),('4',4)", + driver); + executeStatementOnDriver("delete from " + tblName + " where b = 2", driver); + executeStatementOnDriver("insert into " + tblName + " values('5',2),('5',3),('5',4),('6',2),('6',3),('6',4)", + driver); + executeStatementOnDriver("delete from " + tblName + " where a = '1'", driver); + } + + /** + * 3 txns. + */ + protected void insertMmTestData(String tblName) throws Exception { + executeStatementOnDriver("insert into " + tblName + " values('1',2),('1',3),('1',4),('2',2),('2',3),('2',4)", + driver); + executeStatementOnDriver("insert into " + tblName + " values('3',2),('3',3),('3',4),('4',2),('4',3),('4',4)", + driver); + executeStatementOnDriver("insert into " + tblName + " values('5',2),('5',3),('5',4),('6',2),('6',3),('6',4)", + driver); + } + + /** + * i * 1.5 txns. + */ + protected void insertTestData(String tblName, int iterations) throws Exception { + for (int i = 0; i < iterations; i++) { + executeStatementOnDriver("insert into " + tblName + " values('" + i + "'," + i + ")", driver); + } + for (int i = 0; i < iterations; i += 2) { + executeStatementOnDriver("delete from " + tblName + " where b = " + i, driver); + } + } + + /** + * i txns. + */ + protected void insertMmTestData(String tblName, int iterations) throws Exception { + for (int i = 0; i < iterations; i++) { + executeStatementOnDriver("insert into " + tblName + " values('" + i + "'," + i + ")", driver); + } + } + + protected List<String> getAllData(String tblName) throws Exception { + List<String> result = executeStatementOnDriverAndReturnResults("select * from " + tblName, driver); + Collections.sort(result); + return result; + } + + protected List<String> getBucketData(String tblName, String bucketId) throws Exception { + return executeStatementOnDriverAndReturnResults( + "select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + bucketId + " order by ROW__ID", driver); + } + + protected void dropTable(String tblName) throws Exception { + executeStatementOnDriver("drop table " + tblName, driver); + } + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java index 4c01311..9659a3f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -17,21 +17,16 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.Lists; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; @@ -44,20 +39,14 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.DriverFactory; -import org.apache.hadoop.hive.ql.IDriver; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.streaming.HiveStreamingConnection; import org.apache.hive.streaming.StreamingConnection; import org.apache.hive.streaming.StrictDelimitedInputWriter; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver; @@ -66,68 +55,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @SuppressWarnings("deprecation") -public class TestCrudCompactorOnTez { - private static final AtomicInteger salt = new AtomicInteger(new Random().nextInt()); - private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator - + TestCrudCompactorOnTez.class.getCanonicalName() + "-" + System.currentTimeMillis() + "_" + salt - .getAndIncrement()).getPath().replaceAll("\\\\", "/"); - private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; - private HiveConf conf; - private IMetaStoreClient msClient; - private IDriver driver; - - @Before - // Note: we create a new conf and driver object before every test - public void setup() throws Exception { - File f = new File(TEST_WAREHOUSE_DIR); - if (f.exists()) { - FileUtil.fullyDelete(f); - } - if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) { - throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR); - } - HiveConf hiveConf = new HiveConf(this.getClass()); - hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, ""); - hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, ""); - hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR); - hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); - hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); - TxnDbUtil.setConfValues(hiveConf); - TxnDbUtil.cleanDb(hiveConf); - TxnDbUtil.prepDb(hiveConf); - conf = hiveConf; - // Use tez as execution engine for this test class - setupTez(conf); - msClient = new HiveMetaStoreClient(conf); - driver = DriverFactory.newDriver(conf); - SessionState.start(new CliSessionState(conf)); - } - - private void setupTez(HiveConf conf) { - conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez"); - conf.setVar(HiveConf.ConfVars.HIVE_USER_INSTALL_DIR, TEST_DATA_DIR); - conf.set("tez.am.resource.memory.mb", "128"); - conf.set("tez.am.dag.scheduler.class", "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled"); - conf.setBoolean("tez.local.mode", true); - conf.set("fs.defaultFS", "file:///"); - conf.setBoolean("tez.runtime.optimize.local.fetch", true); - conf.set("tez.staging-dir", TEST_DATA_DIR); - conf.setBoolean("tez.ignore.lib.uris", true); - conf.set("hive.tez.container.size", "128"); - conf.setBoolean("hive.merge.tezfiles", false); - conf.setBoolean("hive.in.tez.test", true); - } - - @After - public void tearDown() { - if (msClient != null) { - msClient.close(); - } - if (driver != null) { - driver.close(); - } - conf = null; - } +public class TestCrudCompactorOnTez extends CompactorOnTezTest { @Test public void testMajorCompaction() throws Exception { @@ -217,7 +145,7 @@ public class TestCrudCompactorOnTez { String tableName = "testMinorCompaction"; // Create test table TestDataProvider dataProvider = new TestDataProvider(); - dataProvider.createTable(tableName, false, false); + dataProvider.createFullAcidTable(tableName, false, false); // Find the location of the table IMetaStoreClient msClient = new HiveMetaStoreClient(conf); Table table = msClient.getTable(dbName, tableName); @@ -287,7 +215,7 @@ public class TestCrudCompactorOnTez { String tableName = "testMinorCompaction"; // Create test table TestDataProvider dataProvider = new TestDataProvider(); - dataProvider.createTable(tableName, false, true); + dataProvider.createFullAcidTable(tableName, false, true); // Find the location of the table IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); Table table = metaStoreClient.getTable(dbName, tableName); @@ -360,7 +288,7 @@ public class TestCrudCompactorOnTez { String tableName = "testMinorCompaction"; // Create test table TestDataProvider dataProvider = new TestDataProvider(); - dataProvider.createTable(tableName, true, false); + dataProvider.createFullAcidTable(tableName, true, false); // Find the location of the table IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); Table table = metaStoreClient.getTable(dbName, tableName); @@ -440,7 +368,7 @@ public class TestCrudCompactorOnTez { String tableName = "testMinorCompaction"; // Create test table TestDataProvider dataProvider = new TestDataProvider(); - dataProvider.createTable(tableName, true, true); + dataProvider.createFullAcidTable(tableName, true, true); // Find the location of the table IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); Table table = metaStoreClient.getTable(dbName, tableName); @@ -522,7 +450,7 @@ public class TestCrudCompactorOnTez { String tableName = "testMinorCompaction"; // Create test table TestDataProvider dataProvider = new TestDataProvider(); - dataProvider.createTable(tableName, false, false); + dataProvider.createFullAcidTable(tableName, false, false); // Find the location of the table IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); Table table = metaStoreClient.getTable(dbName, tableName); @@ -577,7 +505,7 @@ public class TestCrudCompactorOnTez { String tableName = "testMinorCompaction"; // Create test table TestDataProvider dataProvider = new TestDataProvider(); - dataProvider.createTable(tableName, false, true); + dataProvider.createFullAcidTable(tableName, false, true); // Find the location of the table IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); Table table = metaStoreClient.getTable(dbName, tableName); @@ -757,7 +685,7 @@ public class TestCrudCompactorOnTez { String tableName = "testMinorCompaction"; // Create test table TestDataProvider dataProvider = new TestDataProvider(); - dataProvider.createTable(tableName, false, false); + dataProvider.createFullAcidTable(tableName, false, false); // Find the location of the table IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); Table table = metaStoreClient.getTable(dbName, tableName); @@ -786,6 +714,10 @@ public class TestCrudCompactorOnTez { // Verify all contents List<String> actualData = dataProvider.getAllData(tableName); Assert.assertEquals(expectedData, actualData); + // Insert another round of test data + dataProvider.insertTestData(tableName); + expectedData = dataProvider.getAllData(tableName); + Collections.sort(expectedData); // Run a compaction CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true); // Clean up resources @@ -796,7 +728,7 @@ public class TestCrudCompactorOnTez { Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(1).getState()); // Verify base directory after compaction Assert.assertEquals("Base directory does not match after major compaction", - Collections.singletonList("base_0000005_v0000023"), + Collections.singletonList("base_0000010_v0000029"), CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null)); // Verify all contents actualData = dataProvider.getAllData(tableName); @@ -804,6 +736,62 @@ public class TestCrudCompactorOnTez { } @Test + public void testMinorCompactionAfterMajor() throws Exception { + String dbName = "default"; + String tableName = "testMinorCompaction"; + // Create test table + TestDataProvider dataProvider = new TestDataProvider(); + dataProvider.createFullAcidTable(tableName, false, false); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + dataProvider.insertTestData(tableName); + // Get all data before compaction is run + List<String> expectedData = dataProvider.getAllData(tableName); + Collections.sort(expectedData); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true); + // Clean up resources + CompactorTestUtil.runCleaner(conf); + // Only 1 compaction should be in the response queue with succeeded state + List<ShowCompactResponseElement> compacts = + TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size()); + Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState()); + // Verify base directory after compaction + Assert.assertEquals("Base directory does not match after major compaction", + Collections.singletonList("base_0000005_v0000009"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null)); + // Verify all contents + List<String> actualData = dataProvider.getAllData(tableName); + Assert.assertEquals(expectedData, actualData); + // Insert another round of test data + dataProvider.insertTestData(tableName); + expectedData = dataProvider.getAllData(tableName); + Collections.sort(expectedData); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + // Clean up resources + CompactorTestUtil.runCleaner(conf); + // 2 compaction should be in the response queue with succeeded state + compacts = TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals("Completed compaction queue must contain one element", 2, compacts.size()); + Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(1).getState()); + // Verify base directory after compaction + Assert.assertEquals("Base directory does not match after major compaction", + Collections.singletonList("base_0000005_v0000009"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null)); + Assert.assertEquals("Delta directories do not match after major compaction", + Collections.singletonList("delta_0000001_0000010_v0000020"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null)); + // Verify all contents + actualData = dataProvider.getAllData(tableName); + Assert.assertEquals(expectedData, actualData); + } + + @Test public void testMinorCompactionWhileStreamingWithSplitUpdate() throws Exception { String dbName = "default"; String tableName = "testMinorCompaction"; @@ -967,71 +955,5 @@ public class TestCrudCompactorOnTez { hiveConf.setVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT, "none"); qc.runCompactionQueries(hiveConf, null, sdMock, null, null, emptyQueries, emptyQueries, emptyQueries); Assert.assertEquals("none", hiveConf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT)); - - } - - private class TestDataProvider { - - private void createTable(String tblName, boolean isPartitioned, boolean isBucketed) throws Exception { - executeStatementOnDriver("drop table if exists " + tblName, driver); - StringBuilder query = new StringBuilder(); - query.append("create table ").append(tblName).append(" (a string, b int)"); - if (isPartitioned) { - query.append(" partitioned by (ds string)"); - } - if (isBucketed) { - query.append(" clustered by (a) into 2 buckets"); - } - query.append(" stored as ORC TBLPROPERTIES('transactional'='true'," + " 'transactional_properties'='default')"); - executeStatementOnDriver(query.toString(), driver); - } - - private void insertTestDataPartitioned(String tblName) throws Exception { - executeStatementOnDriver("insert into " + tblName - + " values('1',2, 'today'),('1',3, 'today'),('1',4, 'yesterday'),('2',2, 'tomorrow')," - + "('2',3, 'yesterday'),('2',4, 'today')", driver); - executeStatementOnDriver("insert into " + tblName - + " values('3',2, 'tomorrow'),('3',3, 'today'),('3',4, 'yesterday'),('4',2, 'today')," - + "('4',3, 'tomorrow'),('4',4, 'today')", driver); - executeStatementOnDriver("delete from " + tblName + " where b = 2", driver); - executeStatementOnDriver("insert into " + tblName + " values('5',2, 'yesterday'),('5',3, 'yesterday')," - + "('5',4, 'today'),('6',2, 'today'),('6',3, 'today'),('6',4, 'today')", driver); - executeStatementOnDriver("delete from " + tblName + " where a = '1'", driver); - } - - private void insertTestData(String tblName) throws Exception { - executeStatementOnDriver("insert into " + tblName + " values('1',2),('1',3),('1',4),('2',2),('2',3),('2',4)", - driver); - executeStatementOnDriver("insert into " + tblName + " values('3',2),('3',3),('3',4),('4',2),('4',3),('4',4)", - driver); - executeStatementOnDriver("delete from " + tblName + " where b = 2", driver); - executeStatementOnDriver("insert into " + tblName + " values('5',2),('5',3),('5',4),('6',2),('6',3),('6',4)", - driver); - executeStatementOnDriver("delete from " + tblName + " where a = '1'", driver); - } - - private void insertTestData(String tblName, int iterations) throws Exception { - for (int i = 0; i < iterations; i++) { - executeStatementOnDriver("insert into " + tblName + " values('" + i + "'," + i + ")", driver); - } - for (int i = 0; i < iterations; i += 2) { - executeStatementOnDriver("delete from " + tblName + " where b = " + i, driver); - } - } - - private List<String> getAllData(String tblName) throws Exception { - List<String> result = executeStatementOnDriverAndReturnResults("select * from " + tblName, driver); - Collections.sort(result); - return result; - } - - private List<String> getBucketData(String tblName, String bucketId) throws Exception { - return executeStatementOnDriverAndReturnResults( - "select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + bucketId + " order by ROW__ID", driver); - } - - private void dropTable(String tblName) throws Exception { - executeStatementOnDriver("drop table " + tblName, driver); - } } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java new file mode 100644 index 0000000..074430c --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.DriverFactory; +import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver; + +/** + * Test functionality of MmMinorQueryCompactor,. + */ +public class TestMmCompactorOnTez extends CompactorOnTezTest { + + @Test public void testMmMinorCompactionNotPartitionedWithoutBuckets() throws Exception { + String dbName = "default"; + String tableName = "testMmMinorCompaction"; + // Create test table + TestDataProvider testDataProvider = new TestCrudCompactorOnTez.TestDataProvider(); + testDataProvider.createMmTable(tableName, false, false); + // Find the location of the table + IMetaStoreClient msClient = new HiveMetaStoreClient(conf); + Table table = msClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + testDataProvider.insertMmTestData(tableName); + // Get all data before compaction is run + List<String> expectedData = testDataProvider.getAllData(tableName); + // Verify deltas + Assert.assertEquals("Delta directories does not match", Arrays + .asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", + "delta_0000003_0000003_0000"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null)); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + CompactorTestUtil.runCleaner(conf); + verifySuccessulTxn(1); + // Verify delta directories after compaction + List<String> actualDeltasAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); + Assert.assertEquals("Delta directories does not match after compaction", + Collections.singletonList("delta_0000001_0000003_v0000007"), actualDeltasAfterComp); + // Verify bucket files in delta dirs + List<String> expectedBucketFiles = Collections.singletonList("000000_0"); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0))); + verifyAllContents(tableName, testDataProvider, expectedData); + // Clean up + testDataProvider.dropTable(tableName); + } + + @Test public void testMmMinorCompactionNotPartitionedWithBuckets() throws Exception { + String dbName = "default"; + String tableName = "testMmMinorCompaction"; + // expected name of the delta dir that will be created with minor compaction + String newDeltaName = "delta_0000001_0000003_v0000007"; + // Create test table + TestDataProvider testDataProvider = new TestDataProvider(); + testDataProvider.createMmTable(tableName, false, true); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + testDataProvider.insertMmTestData(tableName); + // Get all data before compaction is run + List<String> expectedData = testDataProvider.getAllData(tableName); + // Verify deltas + Assert.assertEquals("Delta directories does not match", Arrays + .asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", + "delta_0000003_0000003_0000"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null)); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + CompactorTestUtil.runCleaner(conf); + verifySuccessulTxn(1); + // Verify delta directories after compaction + List<String> actualDeltasAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); + Assert.assertEquals("Delta directories does not match after compaction", + Collections.singletonList(newDeltaName), actualDeltasAfterComp); + // Verify number of files in directory + FileStatus[] files = fs.listStatus(new Path(table.getSd().getLocation(), newDeltaName), + AcidUtils.hiddenFileFilter); + Assert.assertEquals("Incorrect number of bucket files", 2, files.length); + // Verify bucket files in delta dirs + List<String> expectedBucketFiles = Arrays.asList("000000_0", "000001_0"); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0))); + verifyAllContents(tableName, testDataProvider, expectedData); + // Clean up + testDataProvider.dropTable(tableName); + } + + @Test public void testMmMinorCompactionPartitionedWithoutBuckets() throws Exception { + String dbName = "default"; + String tableName = "testMmMinorCompaction"; + // Create test table + TestDataProvider dataProvider = new TestDataProvider(); + dataProvider.createMmTable(tableName, true, false); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + dataProvider.insertMmTestDataPartitioned(tableName); + // Get all data before compaction is run + List<String> expectedData = dataProvider.getAllData(tableName); + // Verify deltas + String partitionToday = "ds=today"; + String partitionTomorrow = "ds=tomorrow"; + String partitionYesterday = "ds=yesterday"; + Assert.assertEquals("Delta directories does not match", Arrays + .asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", + "delta_0000003_0000003_0000"), CompactorTestUtil + .getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday)); + // Run a compaction + CompactorTestUtil + .runCompaction(conf, dbName, tableName, CompactionType.MINOR, true, partitionToday, + partitionTomorrow, partitionYesterday); + CompactorTestUtil.runCleaner(conf); + verifySuccessulTxn(3); + // Verify delta directories after compaction in each partition + List<String> actualDeltasAfterCompPartToday = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday); + Assert.assertEquals("Delta directories does not match after compaction", + Collections.singletonList("delta_0000001_0000003_v0000007"), + actualDeltasAfterCompPartToday); + // Verify bucket files in delta dirs + List<String> expectedBucketFiles = Collections.singletonList("000000_0"); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil + .getBucketFileNames(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0))); + verifyAllContents(tableName, dataProvider, expectedData); + // Clean up + dataProvider.dropTable(tableName); + } + + @Test public void testMmMinorCompactionPartitionedWithBucketsOrc() throws Exception { + testMmMinorCompactionPartitionedWithBuckets("orc"); + } + + @Test public void testMmMinorCompactionPartitionedWithBucketsParquet() throws Exception { + testMmMinorCompactionPartitionedWithBuckets("parquet"); + } + + @Test public void testMmMinorCompactionPartitionedWithBucketsAvro() throws Exception { + testMmMinorCompactionPartitionedWithBuckets("avro"); + } + + @Test public void testMmMinorCompactionPartitionedWithBucketsTextFile() throws Exception { + testMmMinorCompactionPartitionedWithBuckets("textfile"); + } + + @Test public void testMmMinorCompactionPartitionedWithBucketsSequenceFile() throws Exception { + testMmMinorCompactionPartitionedWithBuckets("sequencefile"); + } + + @Test public void testMmMinorCompactionPartitionedWithBucketsRcFile() throws Exception { + testMmMinorCompactionPartitionedWithBuckets("RcFile"); + } + + @Test public void testMmMinorCompactionPartitionedWithBucketsJsonFile() throws Exception { + testMmMinorCompactionPartitionedWithBuckets("JsonFile"); + } + + private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) throws Exception { + String dbName = "default"; + String tableName = "testMmMinorCompaction"; + // Create test table + TestDataProvider dataProvider = new TestDataProvider(); + dataProvider.createMmTable(tableName, true, true, fileFormat); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + dataProvider.insertMmTestDataPartitioned(tableName); + // Get all data before compaction is run + List<String> expectedData = dataProvider.getAllData(tableName); + // Verify deltas + String partitionToday = "ds=today"; + String partitionTomorrow = "ds=tomorrow"; + String partitionYesterday = "ds=yesterday"; + Assert.assertEquals("Delta directories does not match", Arrays + .asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", + "delta_0000003_0000003_0000"), CompactorTestUtil + .getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday)); + // Run a compaction + CompactorTestUtil + .runCompaction(conf, dbName, tableName, CompactionType.MINOR, true, partitionToday, + partitionTomorrow, partitionYesterday); + CompactorTestUtil.runCleaner(conf); + verifySuccessulTxn(3); + // Verify delta directories after compaction in each partition + List<String> actualDeltasAfterCompPartToday = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday); + Assert.assertEquals("Delta directories does not match after compaction", + Collections.singletonList("delta_0000001_0000003_v0000007"), + actualDeltasAfterCompPartToday); + // Verify bucket files in delta dirs + List<String> expectedBucketFiles = Arrays.asList("000000_0", "000001_0"); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles, + CompactorTestUtil + .getBucketFileNames(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0))); + verifyAllContents(tableName, dataProvider, expectedData); + // Clean up + dataProvider.dropTable(tableName); + } + + @Test public void testMmMinorCompaction10DeltaDirs() throws Exception { + String dbName = "default"; + String tableName = "testMmMinorCompaction"; + // Create test table + TestDataProvider dataProvider = new TestDataProvider(); + dataProvider.createMmTable(tableName, false, false); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + dataProvider.insertMmTestData(tableName, 10); + // Get all data before compaction is run + List<String> expectedData = dataProvider.getAllData(tableName); + Collections.sort(expectedData); + // Verify deltas + List<String> deltaNames = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); + Assert.assertEquals(10, deltaNames.size()); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + CompactorTestUtil.runCleaner(conf); + List<ShowCompactResponseElement> compacts = + TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals("Completed compaction queue must contain 3 element", 1, compacts.size()); + compacts.forEach( + c -> Assert.assertEquals("Compaction state is not succeeded", "succeeded", c.getState())); + // Verify delta directories after compaction + List<String> actualDeltasAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); + Assert.assertEquals(Collections.singletonList("delta_0000001_0000010_v0000014"), + actualDeltasAfterComp); + // Verify bucket file in delta dir + List<String> expectedBucketFile = Collections.singletonList("000000_0"); + Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFile, + CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0))); + verifyAllContents(tableName, dataProvider, expectedData); + // Clean up + dataProvider.dropTable(tableName); + } + + @Test public void testMultipleMmMinorCompactions() throws Exception { + String dbName = "default"; + String tableName = "testMmMinorCompaction"; + // Create test table + TestDataProvider dataProvider = new TestDataProvider(); + dataProvider.createMmTable(tableName, false, true); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + dataProvider.insertMmTestData(tableName); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + CompactorTestUtil.runCleaner(conf); + verifySuccessulTxn(1); + List<ShowCompactResponseElement> compacts; + // Insert test data into test table + dataProvider.insertMmTestData(tableName); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + CompactorTestUtil.runCleaner(conf); + verifySuccessulTxn(2); + // Insert test data into test table + dataProvider.insertMmTestData(tableName); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + CompactorTestUtil.runCleaner(conf); + verifySuccessulTxn(3); + // Verify delta directories after compaction + List<String> actualDeltasAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); + Assert.assertEquals("Delta directories does not match after compaction", + Collections.singletonList("delta_0000001_0000009_v0000026"), actualDeltasAfterComp); + + } + + @Test public void testMmMajorCompactionAfterMinor() throws Exception { + String dbName = "default"; + String tableName = "testMmMinorCompaction"; + // Create test table + TestDataProvider dataProvider = new TestDataProvider(); + dataProvider.createMmTable(tableName, false, false); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + dataProvider.insertMmTestData(tableName); + // Get all data before compaction is run + List<String> expectedData = dataProvider.getAllData(tableName); + Collections.sort(expectedData); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + CompactorTestUtil.runCleaner(conf); + verifySuccessulTxn(1); + List<ShowCompactResponseElement> compacts; + // Verify delta directories after compaction + Assert.assertEquals("Delta directories does not match after minor compaction", + Collections.singletonList("delta_0000001_0000003_v0000007"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null)); + verifyAllContents(tableName, dataProvider, expectedData); + List<String> actualData; + // Insert a second round of test data into test table; update expectedData + dataProvider.insertMmTestData(tableName); + expectedData = dataProvider.getAllData(tableName); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true); + CompactorTestUtil.runCleaner(conf); + verifySuccessulTxn(2); + // Verify base directory after compaction + Assert.assertEquals("Base directory does not match after major compaction", + Collections.singletonList("base_0000006_v0000019"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null)); + actualData = dataProvider.getAllData(tableName); + Assert.assertEquals(expectedData, actualData); + } + + @Test public void testMmMinorCompactionAfterMajor() throws Exception { + String dbName = "default"; + String tableName = "testMmMinorCompaction"; + // Create test table + TestDataProvider dataProvider = new TestDataProvider(); + dataProvider.createMmTable(tableName, false, false); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + dataProvider.insertMmTestData(tableName); + // Get all data before compaction is run + List<String> expectedData = dataProvider.getAllData(tableName); + Collections.sort(expectedData); + // Run a major compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true); + CompactorTestUtil.runCleaner(conf); + verifySuccessulTxn(1); + List<ShowCompactResponseElement> compacts; + // Verify base directory after compaction + Assert.assertEquals("Base directory does not match after major compaction", + Collections.singletonList("base_0000003_v0000007"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null)); + verifyAllContents(tableName, dataProvider, expectedData); + // Insert test data into test table + dataProvider.insertMmTestData(tableName); + expectedData = dataProvider.getAllData(tableName); + Collections.sort(expectedData); + // Run a compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + CompactorTestUtil.runCleaner(conf); + verifySuccessulTxn(2); + // Verify base/delta directories after compaction + Assert.assertEquals("Base directory does not match after major compaction", + Collections.singletonList("base_0000003_v0000007"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null)); + Assert.assertEquals("Delta directories does not match after minor compaction", + Collections.singletonList("delta_0000001_0000006_v0000016"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null)); + verifyAllContents(tableName, dataProvider, expectedData); + } + + @Test public void testMmMinorCompactionWithSchemaEvolutionAndBuckets() throws Exception { + String dbName = "default"; + String tblName = "testMmMinorCompactionWithSchemaEvolutionAndBuckets"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("create transactional table " + tblName + + " (a int, b int) partitioned by(ds string) clustered by (a) into 2 buckets" + + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true'," + + " 'transactional_properties'='insert_only')", driver); + // Insert some data + executeStatementOnDriver("insert into " + tblName + + " partition (ds) values" + + "(1,2,'today'),(1,3,'today'),(1,4,'yesterday')," + + "(2,2,'yesterday'),(2,3,'today'),(2,4,'today')", + driver); + // Add a new column + executeStatementOnDriver("alter table " + tblName + " add columns(c int)", driver); + // TODO uncomment this line after HIVE-22826 fixed: + // executeStatementOnDriver("alter table " + tblName + " change column a aa int", driver); + // Insert more data + executeStatementOnDriver("insert into " + tblName + + " partition (ds) values" + + "(3,2,1000,'yesterday'),(3,3,1001,'today')," + + "(3,4,1002,'yesterday'),(4,2,1003,'today')," + + "(4,3,1004,'yesterday'),(4,4,1005,'today')", driver); + // Get all data before compaction is run + TestDataProvider dataProvider = new TestDataProvider(); + List<String> expectedData = dataProvider.getAllData(tblName); + Collections.sort(expectedData); + // Run minor compaction and cleaner + CompactorTestUtil + .runCompaction(conf, dbName, tblName, CompactionType.MINOR, true, "ds=yesterday", + "ds=today"); + CompactorTestUtil.runCleaner(conf); + verifySuccessulTxn(2); + verifyAllContents(tblName, dataProvider, expectedData); + // Clean up + executeStatementOnDriver("drop table " + tblName, driver); + } + + @Test public void testMmMinorCompactionWithSchemaEvolutionNoBucketsMultipleReducers() + throws Exception { + HiveConf hiveConf = new HiveConf(conf); + hiveConf.setIntVar(HiveConf.ConfVars.MAXREDUCERS, 2); + hiveConf.setIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS, 2); + driver = DriverFactory.newDriver(hiveConf); + String dbName = "default"; + String tblName = "testMmMinorCompactionWithSchemaEvolutionNoBucketsMultipleReducers"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver( + "create transactional table " + tblName + " (a int, b int) partitioned by(ds string)" + + " stored as ORC TBLPROPERTIES('transactional'='true'," + + " 'transactional_properties'='insert_only')", driver); + // Insert some data + executeStatementOnDriver("insert into " + tblName + " partition (ds) values" + + "(1,2,'today'),(1,3,'today')," + + "(1,4,'yesterday'),(2,2,'yesterday')," + + "(2,3,'today'),(2,4,'today')", + driver); + // Add a new column + executeStatementOnDriver("alter table " + tblName + " add columns(c int)", driver); + // Insert more data + executeStatementOnDriver("insert into " + tblName + + " partition (ds) values(3,2,1000,'yesterday'),(3,3,1001,'today'),(3,4,1002,'yesterday'),(4,2,1003,'today')," + + "(4,3,1004,'yesterday'),(4,4,1005,'today')", driver); + // Get all data before compaction is run + TestDataProvider dataProvider = new TestDataProvider(); + List<String> expectedData = dataProvider.getAllData(tblName); + Collections.sort(expectedData); + // Run minor compaction and cleaner + CompactorTestUtil + .runCompaction(conf, dbName, tblName, CompactionType.MINOR, true, "ds=yesterday", + "ds=today"); + CompactorTestUtil.runCleaner(hiveConf); + verifySuccessulTxn(2); + + verifyAllContents(tblName, dataProvider, expectedData); + // Clean up + executeStatementOnDriver("drop table " + tblName, driver); + } + + @Test public void testMinorMmCompactionRemovesAbortedDirs() + throws Exception { // see mmTableOpenWriteId + String dbName = "default"; + String tableName = "testMmMinorCompaction"; + // Create test table + TestDataProvider testDataProvider = new TestDataProvider(); + testDataProvider.createMmTable(tableName, false, false); + // Find the location of the table + IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf); + Table table = metaStoreClient.getTable(dbName, tableName); + FileSystem fs = FileSystem.get(conf); + // Insert test data into test table + testDataProvider.insertMmTestData(tableName); + // Get all data before compaction is run. Expected data is 2 x MmTestData insertion + List<String> expectedData = new ArrayList<>(); + List<String> oneMmTestDataInsertion = testDataProvider.getAllData(tableName); + expectedData.addAll(oneMmTestDataInsertion); + expectedData.addAll(oneMmTestDataInsertion); + Collections.sort(expectedData); + // Insert an aborted directory (txns 4-6) + rollbackAllTxns(true, driver); + testDataProvider.insertMmTestData(tableName); + rollbackAllTxns(false, driver); + // Check that delta dirs 4-6 exist + List<String> actualDeltasAfterComp = + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null); + Assert.assertEquals(Lists + .newArrayList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000", + "delta_0000003_0000003_0000", "delta_0000004_0000004_0000", + "delta_0000005_0000005_0000", "delta_0000006_0000006_0000"), actualDeltasAfterComp); + // Insert another round of test data (txns 7-9) + testDataProvider.insertMmTestData(tableName); + verifyAllContents(tableName, testDataProvider, expectedData); + // Run a minor compaction + CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true); + CompactorTestUtil.runCleaner(conf); + verifySuccessulTxn(1); + // Verify delta directories after compaction + Assert.assertEquals("Delta directories does not match after minor compaction", + Collections.singletonList("delta_0000001_0000009_v0000014"), + CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null)); + verifyAllContents(tableName, testDataProvider, expectedData); + } + + /** + * Verify that the expected number of transactions have run, and their state is "succeeded". + * + * @param expectedCompleteCompacts number of compactions already run + * @throws MetaException + */ + private void verifySuccessulTxn(int expectedCompleteCompacts) throws MetaException { + List<ShowCompactResponseElement> compacts = + TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals("Completed compaction queue must contain one element", + expectedCompleteCompacts, compacts.size()); + compacts.forEach( + c -> Assert.assertEquals("Compaction state is not succeeded", "succeeded", c.getState())); + } + + /** + * Results of a select on the table results in the same data as expectedData. + */ + private void verifyAllContents(String tblName, TestDataProvider dataProvider, + List<String> expectedData) throws Exception { + List<String> actualData = dataProvider.getAllData(tblName); + Collections.sort(actualData); + Assert.assertEquals(expectedData, actualData); + } + + /** + * Set to true to cause all transactions to be rolled back, until set back to false. + */ + private static void rollbackAllTxns(boolean val, IDriver driver) { + driver.getConf().setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, val); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index bb70db4..739f2b6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -394,21 +394,6 @@ public class CompactorMR { HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); } - // Remove the directories for aborted transactions only - private void removeFilesForMmTable(HiveConf conf, Directory dir) throws IOException { - // For MM table, we only want to delete delta dirs for aborted txns. - List<Path> filesToDelete = dir.getAbortedDirectories(); - if (filesToDelete.size() < 1) { - return; - } - LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir); - FileSystem fs = filesToDelete.get(0).getFileSystem(conf); - for (Path dead : filesToDelete) { - LOG.debug("Going to delete path " + dead.toString()); - fs.delete(dead, true); - } - } - public JobConf getMrJob() { return mrJob; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java index bad5d00..48387c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java @@ -20,37 +20,23 @@ package org.apache.hadoop.hive.ql.txn.compactor; import com.google.common.collect.Lists; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; -import org.apache.hadoop.hive.ql.ddl.table.create.show.ShowCreateTableOperation; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.util.DirectionUtils; -import org.apache.hadoop.util.StringUtils; -import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.Ref; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; /** * Class responsible to run query based major compaction on insert only tables. @@ -66,14 +52,7 @@ final class MmMajorQueryCompactor extends QueryCompactor { AcidUtils.Directory dir = AcidUtils .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false, table.getParameters(), false); - removeFilesForMmTable(hiveConf, dir); - - // Then, actually do the compaction. - if (!compactionInfo.isMajorCompaction()) { - // Not supported for MM tables right now. - LOG.info("Not compacting " + storageDescriptor.getLocation() + "; not a major compaction"); - return; - } + MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir); if (!Util.isEnoughToCompact(compactionInfo.isMajorCompaction(), dir, storageDescriptor)) { return; @@ -129,107 +108,10 @@ final class MmMajorQueryCompactor extends QueryCompactor { fs.delete(fromPath, true); } - // Remove the directories for aborted transactions only - private void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throws IOException { - // For MM table, we only want to delete delta dirs for aborted txns. - List<Path> filesToDelete = dir.getAbortedDirectories(); - if (filesToDelete.size() < 1) { - return; - } - LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir); - FileSystem fs = filesToDelete.get(0).getFileSystem(conf); - for (Path dead : filesToDelete) { - LOG.debug("Going to delete path " + dead.toString()); - fs.delete(dead, true); - } - } - - private List<String> getCreateQueries(String fullName, Table t, StorageDescriptor sd, String location) { - StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append("("); - List<FieldSchema> cols = t.getSd().getCols(); - boolean isFirst = true; - for (FieldSchema col : cols) { - if (!isFirst) { - query.append(", "); - } - isFirst = false; - query.append("`").append(col.getName()).append("` ").append(col.getType()); - } - query.append(") "); - - // Bucketing. - List<String> buckCols = t.getSd().getBucketCols(); - if (buckCols.size() > 0) { - query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") "); - List<Order> sortCols = t.getSd().getSortCols(); - if (sortCols.size() > 0) { - query.append("SORTED BY ("); - isFirst = true; - for (Order sortCol : sortCols) { - if (!isFirst) { - query.append(", "); - } - isFirst = false; - query.append(sortCol.getCol()).append(" ").append(DirectionUtils.codeToText(sortCol.getOrder())); - } - query.append(") "); - } - query.append("INTO ").append(t.getSd().getNumBuckets()).append(" BUCKETS"); - } - - // Stored as directories. We don't care about the skew otherwise. - if (t.getSd().isStoredAsSubDirectories()) { - SkewedInfo skewedInfo = t.getSd().getSkewedInfo(); - if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) { - query.append(" SKEWED BY (").append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON "); - isFirst = true; - for (List<String> colValues : skewedInfo.getSkewedColValues()) { - if (!isFirst) { - query.append(", "); - } - isFirst = false; - query.append("('").append(StringUtils.join("','", colValues)).append("')"); - } - query.append(") STORED AS DIRECTORIES"); - } - } - - SerDeInfo serdeInfo = sd.getSerdeInfo(); - Map<String, String> serdeParams = serdeInfo.getParameters(); - query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib())) - .append("'"); - String sh = t.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE); - assert sh == null; // Not supposed to be a compactable table. - if (!serdeParams.isEmpty()) { - ShowCreateTableOperation.appendSerdeParams(query, serdeParams); - } - query.append("STORED AS INPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat())) - .append("' OUTPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat())) - .append("' LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES ("); - // Exclude all standard table properties. - Set<String> excludes = getHiveMetastoreConstants(); - excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS); - isFirst = true; - for (Map.Entry<String, String> e : t.getParameters().entrySet()) { - if (e.getValue() == null) { - continue; - } - if (excludes.contains(e.getKey())) { - continue; - } - if (!isFirst) { - query.append(", "); - } - isFirst = false; - query.append("'").append(e.getKey()).append("'='").append(HiveStringUtils.escapeHiveCommand(e.getValue())) - .append("'"); - } - if (!isFirst) { - query.append(", "); - } - query.append("'transactional'='false')"); - return Lists.newArrayList(query.toString()); - + private List<String> getCreateQueries(String tmpTableName, Table table, + StorageDescriptor storageDescriptor, String baseLocation) { + return Lists.newArrayList(MmQueryCompactorUtils + .getCreateQuery(tmpTableName, table, storageDescriptor, baseLocation, false, false)); } private List<String> getCompactionQueries(Table t, Partition p, String tmpName) { @@ -263,28 +145,7 @@ final class MmMajorQueryCompactor extends QueryCompactor { } private List<String> getDropQueries(String tmpTableName) { - return Lists.newArrayList("drop table if exists " + tmpTableName); + return Lists.newArrayList(MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableName); } - private static Set<String> getHiveMetastoreConstants() { - Set<String> result = new HashSet<>(); - for (Field f : hive_metastoreConstants.class.getDeclaredFields()) { - if (!Modifier.isStatic(f.getModifiers())) { - continue; - } - if (!Modifier.isFinal(f.getModifiers())) { - continue; - } - if (!String.class.equals(f.getType())) { - continue; - } - f.setAccessible(true); - try { - result.add((String) f.get(null)); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - } - return result; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java new file mode 100644 index 0000000..feb667c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import com.google.common.collect.Lists; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hive.common.util.Ref; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Run a minor query compaction on an insert only (MM) table. + */ +final class MmMinorQueryCompactor extends QueryCompactor { + + private static final Logger LOG = LoggerFactory.getLogger(MmMinorQueryCompactor.class.getName()); + + @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, + StorageDescriptor storageDescriptor, ValidWriteIdList writeIds, CompactionInfo compactionInfo) + throws IOException { + LOG.debug( + "Going to delete directories for aborted transactions for MM table " + table.getDbName() + + "." + table.getTableName()); + + AcidUtils.Directory dir = AcidUtils + .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, + Ref.from(false), false, table.getParameters(), false); + MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir); + String tmpLocation = Util.generateTmpPath(storageDescriptor); + Path sourceTabLocation = new Path(tmpLocation); + Path resultTabLocation = new Path(tmpLocation, "_result"); + + HiveConf driverConf = setUpDriverSession(hiveConf); + + String tmpPrefix = table.getDbName() + ".tmp_minor_compactor_" + table.getTableName() + "_"; + String tmpTableBase = tmpPrefix + System.currentTimeMillis(); + + List<String> createTableQueries = + getCreateQueries(tmpTableBase, table, partition == null ? table.getSd() : partition.getSd(), + sourceTabLocation.toString(), resultTabLocation.toString(), dir, writeIds); + List<String> compactionQueries = getCompactionQueries(tmpTableBase, table.getSd()); + List<String> dropQueries = getDropQueries(tmpTableBase); + runCompactionQueries(driverConf, tmpTableBase, storageDescriptor, writeIds, compactionInfo, + createTableQueries, compactionQueries, dropQueries); + } + + /** + * Move files from "result table" directory to table/partition to compact's directory. + */ + @Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, + ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { + org.apache.hadoop.hive.ql.metadata.Table resultTable = + Hive.get().getTable(tmpTableName + "_result"); + String from = resultTable.getSd().getLocation(); + Path fromPath = new Path(from); + Path toPath = new Path(dest); + FileSystem fs = fromPath.getFileSystem(conf); + long maxTxn = actualWriteIds.getHighWatermark(); + AcidOutputFormat.Options options = + new AcidOutputFormat.Options(conf).writingBase(false).isCompressed(false) + .minimumWriteId(1).maximumWriteId(maxTxn).bucket(0).statementId(-1) + .visibilityTxnId(compactorTxnId); + Path newDeltaDir = AcidUtils.createFilename(toPath, options).getParent(); + if (!fs.exists(fromPath)) { + LOG.info(from + " not found. Assuming 0 splits. Creating " + newDeltaDir); + fs.mkdirs(newDeltaDir); + return; + } + LOG.info("Moving contents of " + from + " to " + dest); + fs.rename(fromPath, newDeltaDir); + fs.delete(fromPath, true); + } + + /** + * Get a list of create/alter table queries. These tables serves as temporary data source for + * query based minor compaction. The following tables are created: + * <ol> + * <li>tmpTable - "source table": temporary, external, partitioned table. Each partition + * points to exactly one delta directory in the table/partition to compact</li> + * <li>tmpTable_result - "result table" : temporary table which stores the aggregated + * results of the minor compaction query until the compaction can be committed</li> + * </ol> + * + * @param tmpTableBase name of the first temp table (second will be $tmpTableBase_result) + * @param t Table to compact + * @param sd storage descriptor of table or partition to compact + * @param sourceTabLocation location the "source table" (temp table 1) should go + * @param resultTabLocation location the "result table (temp table 2) should go + * @param dir the parent directory of delta directories + * @param validWriteIdList valid write ids for the table/partition to compact + * @return List of 3 query strings: 2 create table, 1 alter table + */ + private List<String> getCreateQueries(String tmpTableBase, Table t, StorageDescriptor sd, + String sourceTabLocation, String resultTabLocation, AcidUtils.Directory dir, + ValidWriteIdList validWriteIdList) { + List<String> queries = new ArrayList<>(); + queries.add( + MmQueryCompactorUtils.getCreateQuery(tmpTableBase, t, sd, sourceTabLocation, true, true)); + buildAlterTableQuery(tmpTableBase, dir, validWriteIdList).ifPresent(queries::add); + queries.add(MmQueryCompactorUtils + .getCreateQuery(tmpTableBase + "_result", t, sd, resultTabLocation, false, false)); + return queries; + } + + /** + * Builds an alter table query, which adds partitions pointing to location of delta directories. + * + * @param tableName name of the temp table to be altered + * @param dir the parent directory of delta directories + * @param validWriteIdList valid write ids for the table/partition to compact + * @return alter table statement wrapped in {@link Optional}. + */ + private Optional<String> buildAlterTableQuery(String tableName, AcidUtils.Directory dir, + ValidWriteIdList validWriteIdList) { + if (!dir.getCurrentDirectories().isEmpty()) { + long minWriteID = + validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId(); + long highWatermark = validWriteIdList.getHighWatermark(); + List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories().stream().filter( + delta -> delta.getMaxWriteId() <= highWatermark && delta.getMinWriteId() >= minWriteID) + .collect(Collectors.toList()); + if (!deltas.isEmpty()) { + StringBuilder query = new StringBuilder().append("alter table ").append(tableName); + query.append(" add "); + deltas.forEach( + delta -> query.append("partition (file_name='").append(delta.getPath().getName()) + .append("') location '").append(delta.getPath()).append("' ")); + return Optional.of(query.toString()); + } + } + return Optional.empty(); + } + + /** + * Get a list containing just the minor compaction query. The query selects the content of the + * source temporary table and inserts it into the resulttable. It will look like: + * <ol> + * <li>insert into table $tmpTableBase_result select `col_1`, .. from tmpTableBase</li> + * </ol> + * + * @param tmpTableBase an unique identifier, which helps to find all the temporary tables + * @return list of compaction queries, always non-null + */ + private List<String> getCompactionQueries(String tmpTableBase, StorageDescriptor sd) { + String resultTableName = tmpTableBase + "_result"; + StringBuilder query = new StringBuilder().append("insert into table ").append(resultTableName) + .append(" select "); + List<FieldSchema> cols = sd.getCols(); + boolean isFirst = true; + for (FieldSchema col : cols) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("`").append(col.getName()).append("`"); + } + query.append(" from ").append(tmpTableBase); + return Lists.newArrayList(query.toString()); + } + + /** + * Get list of drop table statements. + * @param tmpTableBase an unique identifier, which helps to find all the temp tables + * @return list of drop table statements, always non-null + */ + private List<String> getDropQueries(String tmpTableBase) { + return Lists.newArrayList(MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableBase, + MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableBase + "_result"); + } + + private HiveConf setUpDriverSession(HiveConf hiveConf) { + HiveConf driverConf = new HiveConf(hiveConf); + driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column"); + driverConf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_FETCH_COLUMN_STATS, false); + driverConf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_ESTIMATE_STATS, false); + return driverConf; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java new file mode 100644 index 0000000..891696d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.SkewedInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.ddl.table.create.show.ShowCreateTableOperation; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.util.DirectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hive.common.util.HiveStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +final class MmQueryCompactorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(MmQueryCompactorUtils.class.getName()); + static final String DROP_IF_EXISTS = "drop table if exists "; + + private MmQueryCompactorUtils() {} + + /** + * Creates a command to create a new table based on an example table (sourceTab). + * + * @param fullName of new table + * @param sourceTab the table we are modeling the new table on + * @param sd StorageDescriptor of the table or partition we are modeling the new table on + * @param location of the new table + * @param isPartitioned should the new table be partitioned + * @param isExternal should the new table be external + * @return query string creating the new table + */ + static String getCreateQuery(String fullName, Table sourceTab, StorageDescriptor sd, + String location, boolean isPartitioned, boolean isExternal) { + StringBuilder query = new StringBuilder("create temporary "); + if (isExternal) { + query.append("external "); + } + query.append("table ").append(fullName).append("("); + List<FieldSchema> cols = sourceTab.getSd().getCols(); + boolean isFirst = true; + for (FieldSchema col : cols) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("`").append(col.getName()).append("` ").append(col.getType()); + } + query.append(") "); + + // Partitioning. Used for minor compaction. + if (isPartitioned) { + query.append(" PARTITIONED BY (`file_name` STRING) "); + } + + // Bucketing. + List<String> buckCols = sourceTab.getSd().getBucketCols(); + if (buckCols.size() > 0) { + query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") "); + List<Order> sortCols = sourceTab.getSd().getSortCols(); + if (sortCols.size() > 0) { + query.append("SORTED BY ("); + isFirst = true; + for (Order sortCol : sortCols) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append(sortCol.getCol()).append(" ").append(DirectionUtils.codeToText(sortCol.getOrder())); + } + query.append(") "); + } + query.append("INTO ").append(sourceTab.getSd().getNumBuckets()).append(" BUCKETS"); + } + + // Stored as directories. We don't care about the skew otherwise. + if (sourceTab.getSd().isStoredAsSubDirectories()) { + SkewedInfo skewedInfo = sourceTab.getSd().getSkewedInfo(); + if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) { + query.append(" SKEWED BY (").append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON "); + isFirst = true; + for (List<String> colValues : skewedInfo.getSkewedColValues()) { + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("('").append(StringUtils.join("','", colValues)).append("')"); + } + query.append(") STORED AS DIRECTORIES"); + } + } + + SerDeInfo serdeInfo = sd.getSerdeInfo(); + Map<String, String> serdeParams = serdeInfo.getParameters(); + query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib())) + .append("'"); + String sh = sourceTab.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE); + assert sh == null; // Not supposed to be a compactable table. + if (!serdeParams.isEmpty()) { + ShowCreateTableOperation.appendSerdeParams(query, serdeParams); + } + query.append("STORED AS INPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat())) + .append("' OUTPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat())) + .append("' LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES ("); + // Exclude all standard table properties. + Set<String> excludes = getHiveMetastoreConstants(); + excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS); + isFirst = true; + for (Map.Entry<String, String> e : sourceTab.getParameters().entrySet()) { + if (e.getValue() == null) { + continue; + } + if (excludes.contains(e.getKey())) { + continue; + } + if (!isFirst) { + query.append(", "); + } + isFirst = false; + query.append("'").append(e.getKey()).append("'='").append(HiveStringUtils.escapeHiveCommand(e.getValue())) + .append("'"); + } + if (!isFirst) { + query.append(", "); + } + query.append("'transactional'='false')"); + return query.toString(); + + } + + private static Set<String> getHiveMetastoreConstants() { + Set<String> result = new HashSet<>(); + for (Field f : hive_metastoreConstants.class.getDeclaredFields()) { + if (!Modifier.isStatic(f.getModifiers())) { + continue; + } + if (!Modifier.isFinal(f.getModifiers())) { + continue; + } + if (!String.class.equals(f.getType())) { + continue; + } + f.setAccessible(true); + try { + result.add((String) f.get(null)); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + return result; + } + + /** + * Remove the delta directories of aborted transactions. + */ + static void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throws IOException { + List<Path> filesToDelete = dir.getAbortedDirectories(); + if (filesToDelete.size() < 1) { + return; + } + LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir); + FileSystem fs = filesToDelete.get(0).getFileSystem(conf); + for (Path dead : filesToDelete) { + LOG.debug("Going to delete path " + dead.toString()); + fs.delete(dead, true); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java index 2f2bb21..6542eef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java @@ -62,7 +62,11 @@ final class QueryCompactorFactory { if (AcidUtils.isInsertOnlyTable(table.getParameters()) && HiveConf .getBoolVar(configuration, HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM)) { - return new MmMajorQueryCompactor(); + if (compactionInfo.isMajorCompaction()) { + return new MmMajorQueryCompactor(); + } else { + return new MmMinorQueryCompactor(); + } } return null; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 88ca683..e56d831 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -1978,43 +1978,6 @@ public class TestTxnCommands2 { } /** - * Test compaction for Micro-managed table - * 1. Regular compaction shouldn't impact any valid subdirectories of MM tables - * 2. Compactions will only remove subdirectories for aborted transactions of MM tables, if any - * @throws Exception - */ - @Test - public void testMmTableCompaction() throws Exception { - // 1. Insert some rows into MM table - runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(1,2)"); - runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(3,4)"); - // There should be 2 delta directories - verifyDirAndResult(2); - - // 2. Perform a MINOR compaction. Since nothing was aborted, subdirs should stay. - runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'"); - runWorker(hiveConf); - verifyDirAndResult(2); - - // 3. Let a transaction be aborted - hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); - runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(5,6)"); - hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); - // There should be 3 delta directories. The new one is the aborted one. - verifyDirAndResult(3); - - // 4. Perform a MINOR compaction again. This time it will remove the subdir for aborted transaction. - runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'"); - runWorker(hiveConf); - // The worker should remove the subdir for aborted transaction - verifyDirAndResult(2); - - // 5. Run Cleaner. Shouldn't impact anything. - runCleaner(hiveConf); - verifyDirAndResult(2); - } - - /** * Test cleaner for TXN_TO_WRITE_ID table. * @throws Exception */ diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java index aabf15c..1de25cf 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java @@ -105,42 +105,6 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests { runStatementOnDriver("drop table if exists " + t); } } - /** - * Test compaction for Micro-managed table - * 1. Regular compaction shouldn't impact any valid subdirectories of MM tables - * 2. Compactions will only remove subdirectories for aborted transactions of MM tables, if any - * @throws Exception - */ - @Test - public void testMmTableCompaction() throws Exception { - // 1. Insert some rows into MM table - runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)"); - runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)"); - // There should be 2 delta directories - verifyDirAndResult(2); - - // 2. Perform a MINOR compaction. Since nothing was aborted, subdirs should stay. - runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MINOR'"); - runWorker(hiveConf); - verifyDirAndResult(2); - - // 3. Let a transaction be aborted - hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); - runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(5,6)"); - hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); - // There should be 3 delta directories. The new one is the aborted one. - verifyDirAndResult(3); - - // 4. Perform a MINOR compaction again. This time it will remove the subdir for aborted transaction. - runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MINOR'"); - runWorker(hiveConf); - // The worker should remove the subdir for aborted transaction - verifyDirAndResult(2); - - // 5. Run Cleaner. Shouldn't impact anything. - runCleaner(hiveConf); - verifyDirAndResult(2); - } /** * Test a scenario, on a micro-managed table, where an IOW comes in @@ -523,96 +487,6 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests { verifyDirAndResult(0, true); } - @Test - public void testSnapshotIsolationWithAbortedTxnOnMmTable() throws Exception { - - // Insert two rows into the table. - runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)"); - runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)"); - // There should be 2 delta directories - verifyDirAndResult(2); - - // Initiate a minor compaction request on the table. - runStatementOnDriver("alter table " + TableExtended.MMTBL + " compact 'MINOR'"); - - // Run Compaction Worker to do compaction. - // But we do not compact a MM table but only transit the compaction request to - // "ready for cleaning" state in this case. - runWorker(hiveConf); - verifyDirAndResult(2); - - // Start an INSERT statement transaction and roll back this transaction. - hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); - runStatementOnDriver("insert into " + TableExtended.MMTBL + " values (5, 6)"); - hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); - /** - * There should be 3 delta directories. The new one is the aborted one. - * - * target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1541637725613/warehouse/mmtbl/ - ├── delta_0000001_0000001_0000 - │ └── 000000_0 - ├── delta_0000002_0000002_0000 - │ └── 000000_0 - └── delta_0000003_0000003_0000 - └── 000000_0 - */ - verifyDirAndResult(3); - - // Execute SELECT statement and verify the result set (should be two rows). - int[][] expected = new int[][] {{1, 2}, {3, 4}}; - List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); - Assert.assertEquals(stringifyValues(expected), rs); - - // Run Cleaner. - // delta_0000003_0000003_0000 produced by the aborted txn is removed even though it is - // above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID since all data in it is aborted - // This run does transition the entry "successful". - runCleaner(hiveConf); - verifyDirAndResult(2); - - // Execute SELECT and verify that aborted operation is not counted for MM table. - rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); - Assert.assertEquals(stringifyValues(expected), rs); - - // Run initiator to execute CompactionTxnHandler.cleanEmptyAbortedTxns() - Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), - 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); - TestTxnCommands2.runInitiator(hiveConf); - // This run of Initiator doesn't add any compaction_queue entry - // since we only have one MM table with data - we don't compact MM tables. - verifyDirAndResult(2); - Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), - 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); - - // Execute SELECT statement and verify that aborted INSERT statement is not counted. - rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); - Assert.assertEquals(stringifyValues(expected), rs); - - // Initiate a minor compaction request on the table. - runStatementOnDriver("alter table " + TableExtended.MMTBL + " compact 'MINOR'"); - - // Run worker to delete aborted transaction's delta directory. - runWorker(hiveConf); - Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), - 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); - Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_COMPONENTS"), - 1, - TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_COMPONENTS")); - verifyDirAndResult(2); - - // Run Cleaner to delete rows for the aborted transaction - // from TXN_COMPONENTS. - runCleaner(hiveConf); - - // Run initiator to clean the row fro the aborted transaction from TXNS. - TestTxnCommands2.runInitiator(hiveConf); - Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), - 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS")); - Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_COMPONENTS"), - 0, - TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_COMPONENTS")); - } - private void verifyDirAndResult(int expectedDeltas) throws Exception { verifyDirAndResult(expectedDeltas, false); }