Repository: hive Updated Branches: refs/heads/branch-1 73a677be3 -> 214e4b6ff
HIVE-10632 : Make sure TXN_COMPONENTS gets cleaned up if table is dropped before compaction (Wei Zheng, reviewed by Alan Gates) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/214e4b6f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/214e4b6f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/214e4b6f Branch: refs/heads/branch-1 Commit: 214e4b6ffedbdc0f610babcf1156cd32f0659db3 Parents: 73a677b Author: Wei Zheng <w...@apache.org> Authored: Thu Mar 10 16:57:26 2016 -0800 Committer: Wei Zheng <w...@apache.org> Committed: Thu Mar 10 16:57:26 2016 -0800 ---------------------------------------------------------------------- .../hive/metastore/AcidEventListener.java | 69 +++++++ .../hadoop/hive/metastore/HiveMetaStore.java | 1 + .../hadoop/hive/metastore/txn/TxnDbUtil.java | 20 +- .../hadoop/hive/metastore/txn/TxnHandler.java | 183 +++++++++++++++++- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 25 +-- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 22 +-- .../hive/ql/lockmgr/TestDbTxnManager2.java | 186 +++++++++++++++++++ 7 files changed, 460 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/214e4b6f/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java new file mode 100644 index 0000000..767bc54 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.HiveObjectType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; + + +/** + * It handles cleanup of dropped partition/table/database in ACID related metastore tables + */ +public class AcidEventListener extends MetaStoreEventListener { + + private TxnHandler txnHandler; + private HiveConf hiveConf; + + public AcidEventListener(Configuration configuration) { + super(configuration); + hiveConf = (HiveConf) configuration; + } + + @Override + public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException { + // We can loop thru all the tables to check if they are ACID first and then perform cleanup, + // but it's more efficient to unconditionally perform cleanup for the database, especially + // when there are a lot of tables + txnHandler = new TxnHandler(hiveConf); + txnHandler.cleanupRecords(HiveObjectType.DATABASE, dbEvent.getDatabase(), null, null); + } + + @Override + public void onDropTable(DropTableEvent tableEvent) throws MetaException { + if (TxnHandler.isAcidTable(tableEvent.getTable())) { + txnHandler = new TxnHandler(hiveConf); + txnHandler.cleanupRecords(HiveObjectType.TABLE, null, tableEvent.getTable(), null); + } + } + + @Override + public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException { + if (TxnHandler.isAcidTable(partitionEvent.getTable())) { + txnHandler = new TxnHandler(hiveConf); + txnHandler.cleanupRecords(HiveObjectType.PARTITION, null, partitionEvent.getTable(), + partitionEvent.getPartitionIterator()); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/214e4b6f/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index b702caa..fba545d 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -488,6 +488,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { listeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class, hiveConf, hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS)); listeners.add(new SessionPropertiesListener(hiveConf)); + listeners.add(new AcidEventListener(hiveConf)); endFunctionListeners = MetaStoreUtils.getMetaStoreListeners( MetaStoreEndFunctionListener.class, hiveConf, hiveConf.getVar(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS)); http://git-wip-us.apache.org/repos/asf/hive/blob/214e4b6f/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 5704b5f..17e7c85 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -32,7 +32,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.shims.ShimLoader; /** - * Utility methods for creating and destroying txn database/schema. + * Utility methods for creating and destroying txn database/schema, plus methods for + * querying against metastore tables. * Placed here in a separate class so it can be shared across unit tests. */ public final class TxnDbUtil { @@ -142,8 +143,13 @@ public final class TxnDbUtil { conn.commit(); } catch (SQLException e) { + try { + conn.rollback(); + } catch (SQLException re) { + System.err.println("Error rolling back: " + re.getMessage()); + } + // This might be a deadlock, if so, let's retry - conn.rollback(); if (e instanceof SQLTransactionRollbackException && deadlockCnt++ < 5) { LOG.warn("Caught deadlock, retrying db creation"); prepDb(); @@ -219,14 +225,20 @@ public final class TxnDbUtil { } } - public static int findNumCurrentLocks() throws Exception { + /** + * Utility method used to run COUNT queries like "select count(*) from ..." against metastore tables + * @param countQuery countQuery text + * @return count countQuery result + * @throws Exception + */ + public static int countQueryAgent(String countQuery) throws Exception { Connection conn = null; Statement stmt = null; ResultSet rs = null; try { conn = getConnection(); stmt = conn.createStatement(); - rs = stmt.executeQuery("select count(*) from hive_locks"); + rs = stmt.executeQuery(countQuery); if (!rs.next()) { return 0; } http://git-wip-us.apache.org/repos/asf/hive/blob/214e4b6f/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index fd66415..59c1637 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.StringUtils; @@ -1180,6 +1181,186 @@ public class TxnHandler { } /** + * Clean up corresponding records in metastore tables, specifically: + * TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS + */ + public void cleanupRecords(HiveObjectType type, Database db, Table table, + Iterator<Partition> partitionIterator) throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + + try { + String dbName; + String tblName; + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + List<String> queries = new ArrayList<String>(); + StringBuilder buff = new StringBuilder(); + + switch (type) { + case DATABASE: + dbName = db.getName(); + + buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append(dbName); + buff.append("'"); + queries.add(buff.toString()); + + break; + case TABLE: + dbName = table.getDbName(); + tblName = table.getTableName(); + + buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append(dbName); + buff.append("' and tc_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append(dbName); + buff.append("' and ctc_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append(dbName); + buff.append("' and cq_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append(dbName); + buff.append("' and cc_table='"); + buff.append(tblName); + buff.append("'"); + queries.add(buff.toString()); + + break; + case PARTITION: + dbName = table.getDbName(); + tblName = table.getTableName(); + List<FieldSchema> partCols = table.getPartitionKeys(); // partition columns + List<String> partVals; // partition values + String partName; + + while (partitionIterator.hasNext()) { + Partition p = partitionIterator.next(); + partVals = p.getValues(); + partName = Warehouse.makePartName(partCols, partVals); + + buff.append("delete from TXN_COMPONENTS where tc_database='"); + buff.append(dbName); + buff.append("' and tc_table='"); + buff.append(tblName); + buff.append("' and tc_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='"); + buff.append(dbName); + buff.append("' and ctc_table='"); + buff.append(tblName); + buff.append("' and ctc_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPACTION_QUEUE where cq_database='"); + buff.append(dbName); + buff.append("' and cq_table='"); + buff.append(tblName); + buff.append("' and cq_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + + buff.setLength(0); + buff.append("delete from COMPLETED_COMPACTIONS where cc_database='"); + buff.append(dbName); + buff.append("' and cc_table='"); + buff.append(tblName); + buff.append("' and cc_partition='"); + buff.append(partName); + buff.append("'"); + queries.add(buff.toString()); + } + + break; + default: + throw new MetaException("Invalid object type for cleanup: " + type); + } + + for (String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + stmt.executeUpdate(query); + } + + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "cleanupRecords"); + if (e.getMessage().contains("does not exist")) { + LOG.warn("Cannot perform cleanup since metastore table does not exist"); + } else { + throw new MetaException("Unable to clean up " + StringUtils.stringifyException(e)); + } + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + } catch (RetryException e) { + cleanupRecords(type, db, table, partitionIterator); + } + } + + + /** Checks if a table is a valid ACID table. + * Note, users are responsible for using the correct TxnManager. We do not look at + * SessionState.get().getTxnMgr().supportsAcid() here + * @param table table + * @return true if table is a legit ACID table, false otherwise + */ + public static boolean isAcidTable(Table table) { + if (table == null) { + return false; + } + Map<String, String> parameters = table.getParameters(); + String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); + } + + /** * For testing only, do not use. */ @VisibleForTesting @@ -1626,7 +1807,7 @@ public class TxnHandler { TxnDbUtil.prepDb(); } catch (Exception e) { // We may have already created the tables and thus don't need to redo it. - if (!e.getMessage().contains("already exists")) { + if (e.getMessage() != null && !e.getMessage().contains("already exists")) { throw new RuntimeException("Unable to set up transaction database for" + " testing: " + e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/hive/blob/214e4b6f/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index d98fa93..15a61d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -621,12 +621,7 @@ public class AcidUtils { HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable); } - /** Checks metadata to make sure it's a valid ACID table at metadata level - * Three things we will check: - * 1. TBLPROPERTIES 'transactional'='true' - * 2. The table should be bucketed - * 3. InputFormatClass/OutputFormatClass should implement AcidInputFormat/AcidOutputFormat - * Currently OrcInputFormat/OrcOutputFormat is the only implementer + /** Checks if a table is a valid ACID table. * Note, users are responsible for using the correct TxnManager. We do not look at * SessionState.get().getTxnMgr().supportsAcid() here * @param table table @@ -640,23 +635,7 @@ public class AcidUtils { if (tableIsTransactional == null) { tableIsTransactional = table.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); } - if (tableIsTransactional == null || !tableIsTransactional.equalsIgnoreCase("true")) { - return false; - } - - List<String> bucketCols = table.getBucketCols(); - if (bucketCols == null || bucketCols.isEmpty()) { - return false; - } - - Class<? extends InputFormat> inputFormatClass = table.getInputFormatClass(); - Class<? extends OutputFormat> outputFormatClass = table.getOutputFormatClass(); - if (inputFormatClass == null || outputFormatClass == null || - !AcidInputFormat.class.isAssignableFrom(inputFormatClass) || - !AcidOutputFormat.class.isAssignableFrom(outputFormatClass)) { - return false; - } - return true; + return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } } http://git-wip-us.apache.org/repos/asf/hive/blob/214e4b6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index a16a788..44b77e7 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -331,14 +331,7 @@ public class TestTxnCommands2 { // 4. Perform a major compaction runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); - Worker w = new Worker(); - w.setThreadId((int) w.getId()); - w.setHiveConf(hiveConf); - AtomicBoolean stop = new AtomicBoolean(); - AtomicBoolean looped = new AtomicBoolean(); - stop.set(true); - w.init(stop, looped); - w.run(); + runWorker(hiveConf); // There should be 1 new directory: base_xxxxxxx. // Original bucket files and delta directory should stay until Cleaner kicks in. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + @@ -375,14 +368,7 @@ public class TestTxnCommands2 { // Before Cleaner, there should be 5 items: // 2 original files, 1 original directory, 1 base directory and 1 delta directory Assert.assertEquals(5, status.length); - Cleaner c = new Cleaner(); - c.setThreadId((int) c.getId()); - c.setHiveConf(hiveConf); - stop = new AtomicBoolean(); - looped = new AtomicBoolean(); - stop.set(true); - c.init(stop, looped); - c.run(); + runCleaner(hiveConf); // There should be only 1 directory left: base_xxxxxxx. // Original bucket files and delta directory should have been cleaned up. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + @@ -596,7 +582,7 @@ public class TestTxnCommands2 { } return compactionsByState; } - private static void runWorker(HiveConf hiveConf) throws MetaException { + public static void runWorker(HiveConf hiveConf) throws MetaException { AtomicBoolean stop = new AtomicBoolean(true); Worker t = new Worker(); t.setThreadId((int) t.getId()); @@ -605,7 +591,7 @@ public class TestTxnCommands2 { t.init(stop, looped); t.run(); } - private static void runCleaner(HiveConf hiveConf) throws MetaException { + public static void runCleaner(HiveConf hiveConf) throws MetaException { AtomicBoolean stop = new AtomicBoolean(true); Cleaner t = new Cleaner(); t.setThreadId((int) t.getId()); http://git-wip-us.apache.org/repos/asf/hive/blob/214e4b6f/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 685f42a..5b775f9 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -277,6 +277,192 @@ public class TestDbTxnManager2 { Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or delete using transaction manager that does not support these operations.")); } + /** + * Normally the compaction process will clean up records in TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, + * COMPACTION_QUEUE and COMPLETED_COMPACTIONS. But if a table/partition has been dropped before + * compaction and there are still relevant records in those metastore tables, the Initiator will + * complain about not being able to find the table/partition. This method is to test and make sure + * we clean up relevant records as soon as a table/partition is dropped. + * + * Note, here we don't need to worry about cleaning up TXNS table, since it's handled separately. + * @throws Exception + */ + @Test + public void testMetastoreTablesCleanup() throws Exception { + CommandProcessorResponse cpr = driver.run("create database if not exists temp"); + checkCmdOnDriver(cpr); + + // Create some ACID tables: T10, T11 - unpartitioned table, T12p, T13p - partitioned table + cpr = driver.run("create table temp.T10 (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table temp.T11 (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table temp.T12p (a int, b int) partitioned by (ds string, hour string) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + cpr = driver.run("create table temp.T13p (a int, b int) partitioned by (ds string, hour string) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + + // Successfully insert some data into ACID tables, so that we have records in COMPLETED_TXN_COMPONENTS + cpr = driver.run("insert into temp.T10 values (1, 1)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T10 values (2, 2)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T11 values (3, 3)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T11 values (4, 4)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T12p partition (ds='today', hour='1') values (5, 5)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T12p partition (ds='tomorrow', hour='2') values (6, 6)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T13p partition (ds='today', hour='1') values (7, 7)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T13p partition (ds='tomorrow', hour='2') values (8, 8)"); + checkCmdOnDriver(cpr); + int count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11')"); + Assert.assertEquals(4, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t12p', 't13p')"); + Assert.assertEquals(4, count); + + // Fail some inserts, so that we have records in TXN_COMPONENTS + conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + cpr = driver.run("insert into temp.T10 values (9, 9)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T11 values (10, 10)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T12p partition (ds='today', hour='1') values (11, 11)"); + checkCmdOnDriver(cpr); + cpr = driver.run("insert into temp.T13p partition (ds='today', hour='1') values (12, 12)"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(4, count); + conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); + + // Drop a table/partition; corresponding records in TXN_COMPONENTS and COMPLETED_TXN_COMPONENTS should disappear + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t10'"); + Assert.assertEquals(1, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t10'"); + Assert.assertEquals(2, count); + cpr = driver.run("drop table temp.T10"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t10'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t10'"); + Assert.assertEquals(0, count); + + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t12p' and TC_PARTITION='ds=today/hour=1'"); + Assert.assertEquals(1, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t12p' and CTC_PARTITION='ds=today/hour=1'"); + Assert.assertEquals(1, count); + cpr = driver.run("alter table temp.T12p drop partition (ds='today', hour='1')"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE='t12p' and TC_PARTITION='ds=today/hour=1'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t12p' and CTC_PARTITION='ds=today/hour=1'"); + Assert.assertEquals(0, count); + + // Successfully perform compaction on a table/partition, so that we have successful records in COMPLETED_COMPACTIONS + cpr = driver.run("alter table temp.T11 compact 'minor'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='i'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='r' and CQ_TYPE='i'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='s' and CC_TYPE='i'"); + Assert.assertEquals(1, count); + + cpr = driver.run("alter table temp.T12p partition (ds='tomorrow', hour='2') compact 'minor'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='i'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='r' and CQ_TYPE='i'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='s' and CC_TYPE='i'"); + Assert.assertEquals(1, count); + + // Fail compaction, so that we have failed records in COMPLETED_COMPACTIONS + conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true); + cpr = driver.run("alter table temp.T11 compact 'major'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and CC_STATE='f' and CC_TYPE='a'"); + Assert.assertEquals(1, count); + + cpr = driver.run("alter table temp.T12p partition (ds='tomorrow', hour='2') compact 'major'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(1, count); + org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='f' and CC_TYPE='a'"); + Assert.assertEquals(1, count); + conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false); + + // Put 2 records into COMPACTION_QUEUE and do nothing + cpr = driver.run("alter table temp.T11 compact 'major'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(1, count); + cpr = driver.run("alter table temp.T12p partition (ds='tomorrow', hour='2') compact 'major'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p' and CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(1, count); + + // Drop a table/partition, corresponding records in COMPACTION_QUEUE and COMPLETED_COMPACTIONS should disappear + cpr = driver.run("drop table temp.T11"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11'"); + Assert.assertEquals(0, count); + + cpr = driver.run("alter table temp.T12p drop partition (ds='tomorrow', hour='2')"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t12p'"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p'"); + Assert.assertEquals(0, count); + + // Put 1 record into COMPACTION_QUEUE and do nothing + cpr = driver.run("alter table temp.T13p partition (ds='today', hour='1') compact 'major'"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t13p' and CQ_STATE='i' and CQ_TYPE='a'"); + Assert.assertEquals(1, count); + + // Drop database, everything in all 4 meta tables should disappear + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(1, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(2, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(1, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(0, count); + cpr = driver.run("drop database if exists temp cascade"); + checkCmdOnDriver(cpr); + count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(0, count); + count = TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE in ('t10', 't11', 't12p', 't13p')"); + Assert.assertEquals(0, count); + } + private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) { Assert.assertEquals(l.toString(),l.getType(), type); Assert.assertEquals(l.toString(),l.getState(), state);