Repository: hive
Updated Branches:
  refs/heads/branch-1 73a677be3 -> 214e4b6ff


HIVE-10632 : Make sure TXN_COMPONENTS gets cleaned up if table is dropped 
before compaction (Wei Zheng, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/214e4b6f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/214e4b6f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/214e4b6f

Branch: refs/heads/branch-1
Commit: 214e4b6ffedbdc0f610babcf1156cd32f0659db3
Parents: 73a677b
Author: Wei Zheng <w...@apache.org>
Authored: Thu Mar 10 16:57:26 2016 -0800
Committer: Wei Zheng <w...@apache.org>
Committed: Thu Mar 10 16:57:26 2016 -0800

----------------------------------------------------------------------
 .../hive/metastore/AcidEventListener.java       |  69 +++++++
 .../hadoop/hive/metastore/HiveMetaStore.java    |   1 +
 .../hadoop/hive/metastore/txn/TxnDbUtil.java    |  20 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 183 +++++++++++++++++-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |  25 +--
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |  22 +--
 .../hive/ql/lockmgr/TestDbTxnManager2.java      | 186 +++++++++++++++++++
 7 files changed, 460 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/214e4b6f/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
new file mode 100644
index 0000000..767bc54
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.HiveObjectType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+
+
+/**
+ * It handles cleanup of dropped partition/table/database in ACID related 
metastore tables
+ */
+public class AcidEventListener extends MetaStoreEventListener {
+
+  private TxnHandler txnHandler;
+  private HiveConf hiveConf;
+
+  public AcidEventListener(Configuration configuration) {
+    super(configuration);
+    hiveConf = (HiveConf) configuration;
+  }
+
+  @Override
+  public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException {
+    // We can loop thru all the tables to check if they are ACID first and 
then perform cleanup,
+    // but it's more efficient to unconditionally perform cleanup for the 
database, especially
+    // when there are a lot of tables
+    txnHandler = new TxnHandler(hiveConf);
+    txnHandler.cleanupRecords(HiveObjectType.DATABASE, dbEvent.getDatabase(), 
null, null);
+  }
+
+  @Override
+  public void onDropTable(DropTableEvent tableEvent)  throws MetaException {
+    if (TxnHandler.isAcidTable(tableEvent.getTable())) {
+      txnHandler = new TxnHandler(hiveConf);
+      txnHandler.cleanupRecords(HiveObjectType.TABLE, null, 
tableEvent.getTable(), null);
+    }
+  }
+
+  @Override
+  public void onDropPartition(DropPartitionEvent partitionEvent)  throws 
MetaException {
+    if (TxnHandler.isAcidTable(partitionEvent.getTable())) {
+      txnHandler = new TxnHandler(hiveConf);
+      txnHandler.cleanupRecords(HiveObjectType.PARTITION, null, 
partitionEvent.getTable(),
+          partitionEvent.getPartitionIterator());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/214e4b6f/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index b702caa..fba545d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -488,6 +488,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       listeners = 
MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class, hiveConf,
           hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS));
       listeners.add(new SessionPropertiesListener(hiveConf));
+      listeners.add(new AcidEventListener(hiveConf));
       endFunctionListeners = MetaStoreUtils.getMetaStoreListeners(
           MetaStoreEndFunctionListener.class, hiveConf,
           hiveConf.getVar(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS));

http://git-wip-us.apache.org/repos/asf/hive/blob/214e4b6f/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 5704b5f..17e7c85 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -32,7 +32,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.shims.ShimLoader;
 
 /**
- * Utility methods for creating and destroying txn database/schema.
+ * Utility methods for creating and destroying txn database/schema, plus 
methods for
+ * querying against metastore tables.
  * Placed here in a separate class so it can be shared across unit tests.
  */
 public final class TxnDbUtil {
@@ -142,8 +143,13 @@ public final class TxnDbUtil {
 
       conn.commit();
     } catch (SQLException e) {
+      try {
+        conn.rollback();
+      } catch (SQLException re) {
+        System.err.println("Error rolling back: " + re.getMessage());
+      }
+
       // This might be a deadlock, if so, let's retry
-      conn.rollback();
       if (e instanceof SQLTransactionRollbackException && deadlockCnt++ < 5) {
         LOG.warn("Caught deadlock, retrying db creation");
         prepDb();
@@ -219,14 +225,20 @@ public final class TxnDbUtil {
     }
   }
 
-  public static int findNumCurrentLocks() throws Exception {
+  /**
+   * Utility method used to run COUNT queries like "select count(*) from ..." 
against metastore tables
+   * @param countQuery countQuery text
+   * @return count countQuery result
+   * @throws Exception
+   */
+  public static int countQueryAgent(String countQuery) throws Exception {
     Connection conn = null;
     Statement stmt = null;
     ResultSet rs = null;
     try {
       conn = getConnection();
       stmt = conn.createStatement();
-      rs = stmt.executeQuery("select count(*) from hive_locks");
+      rs = stmt.executeQuery(countQuery);
       if (!rs.next()) {
         return 0;
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/214e4b6f/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index fd66415..59c1637 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.util.StringUtils;
@@ -1180,6 +1181,186 @@ public class TxnHandler {
   }
 
   /**
+   * Clean up corresponding records in metastore tables, specifically:
+   * TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, 
COMPLETED_COMPACTIONS
+   */
+  public void cleanupRecords(HiveObjectType type, Database db, Table table,
+                             Iterator<Partition> partitionIterator) throws 
MetaException {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+
+      try {
+        String dbName;
+        String tblName;
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        List<String> queries = new ArrayList<String>();
+        StringBuilder buff = new StringBuilder();
+
+        switch (type) {
+          case DATABASE:
+            dbName = db.getName();
+
+            buff.append("delete from TXN_COMPONENTS where tc_database='");
+            buff.append(dbName);
+            buff.append("'");
+            queries.add(buff.toString());
+
+            buff.setLength(0);
+            buff.append("delete from COMPLETED_TXN_COMPONENTS where 
ctc_database='");
+            buff.append(dbName);
+            buff.append("'");
+            queries.add(buff.toString());
+
+            buff.setLength(0);
+            buff.append("delete from COMPACTION_QUEUE where cq_database='");
+            buff.append(dbName);
+            buff.append("'");
+            queries.add(buff.toString());
+
+            buff.setLength(0);
+            buff.append("delete from COMPLETED_COMPACTIONS where 
cc_database='");
+            buff.append(dbName);
+            buff.append("'");
+            queries.add(buff.toString());
+
+            break;
+          case TABLE:
+            dbName = table.getDbName();
+            tblName = table.getTableName();
+
+            buff.append("delete from TXN_COMPONENTS where tc_database='");
+            buff.append(dbName);
+            buff.append("' and tc_table='");
+            buff.append(tblName);
+            buff.append("'");
+            queries.add(buff.toString());
+
+            buff.setLength(0);
+            buff.append("delete from COMPLETED_TXN_COMPONENTS where 
ctc_database='");
+            buff.append(dbName);
+            buff.append("' and ctc_table='");
+            buff.append(tblName);
+            buff.append("'");
+            queries.add(buff.toString());
+
+            buff.setLength(0);
+            buff.append("delete from COMPACTION_QUEUE where cq_database='");
+            buff.append(dbName);
+            buff.append("' and cq_table='");
+            buff.append(tblName);
+            buff.append("'");
+            queries.add(buff.toString());
+
+            buff.setLength(0);
+            buff.append("delete from COMPLETED_COMPACTIONS where 
cc_database='");
+            buff.append(dbName);
+            buff.append("' and cc_table='");
+            buff.append(tblName);
+            buff.append("'");
+            queries.add(buff.toString());
+
+            break;
+          case PARTITION:
+            dbName = table.getDbName();
+            tblName = table.getTableName();
+            List<FieldSchema> partCols = table.getPartitionKeys();  // 
partition columns
+            List<String> partVals;                                  // 
partition values
+            String partName;
+
+            while (partitionIterator.hasNext()) {
+              Partition p = partitionIterator.next();
+              partVals = p.getValues();
+              partName = Warehouse.makePartName(partCols, partVals);
+
+              buff.append("delete from TXN_COMPONENTS where tc_database='");
+              buff.append(dbName);
+              buff.append("' and tc_table='");
+              buff.append(tblName);
+              buff.append("' and tc_partition='");
+              buff.append(partName);
+              buff.append("'");
+              queries.add(buff.toString());
+
+              buff.setLength(0);
+              buff.append("delete from COMPLETED_TXN_COMPONENTS where 
ctc_database='");
+              buff.append(dbName);
+              buff.append("' and ctc_table='");
+              buff.append(tblName);
+              buff.append("' and ctc_partition='");
+              buff.append(partName);
+              buff.append("'");
+              queries.add(buff.toString());
+
+              buff.setLength(0);
+              buff.append("delete from COMPACTION_QUEUE where cq_database='");
+              buff.append(dbName);
+              buff.append("' and cq_table='");
+              buff.append(tblName);
+              buff.append("' and cq_partition='");
+              buff.append(partName);
+              buff.append("'");
+              queries.add(buff.toString());
+
+              buff.setLength(0);
+              buff.append("delete from COMPLETED_COMPACTIONS where 
cc_database='");
+              buff.append(dbName);
+              buff.append("' and cc_table='");
+              buff.append(tblName);
+              buff.append("' and cc_partition='");
+              buff.append(partName);
+              buff.append("'");
+              queries.add(buff.toString());
+            }
+
+            break;
+          default:
+            throw new MetaException("Invalid object type for cleanup: " + 
type);
+        }
+
+        for (String query : queries) {
+          LOG.debug("Going to execute update <" + query + ">");
+          stmt.executeUpdate(query);
+        }
+
+        LOG.debug("Going to commit");
+        dbConn.commit();
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "cleanupRecords");
+        if (e.getMessage().contains("does not exist")) {
+          LOG.warn("Cannot perform cleanup since metastore table does not 
exist");
+        } else {
+          throw new MetaException("Unable to clean up " + 
StringUtils.stringifyException(e));
+        }
+      } finally {
+        closeStmt(stmt);
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      cleanupRecords(type, db, table, partitionIterator);
+    }
+  }
+
+
+  /** Checks if a table is a valid ACID table.
+   * Note, users are responsible for using the correct TxnManager. We do not 
look at
+   * SessionState.get().getTxnMgr().supportsAcid() here
+   * @param table table
+   * @return true if table is a legit ACID table, false otherwise
+   */
+  public static boolean isAcidTable(Table table) {
+    if (table == null) {
+      return false;
+    }
+    Map<String, String> parameters = table.getParameters();
+    String tableIsTransactional = 
parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+    return tableIsTransactional != null && 
tableIsTransactional.equalsIgnoreCase("true");
+  }
+
+  /**
    * For testing only, do not use.
    */
   @VisibleForTesting
@@ -1626,7 +1807,7 @@ public class TxnHandler {
         TxnDbUtil.prepDb();
       } catch (Exception e) {
         // We may have already created the tables and thus don't need to redo 
it.
-        if (!e.getMessage().contains("already exists")) {
+        if (e.getMessage() != null && !e.getMessage().contains("already 
exists")) {
           throw new RuntimeException("Unable to set up transaction database 
for" +
             " testing: " + e.getMessage(), e);
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/214e4b6f/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index d98fa93..15a61d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -621,12 +621,7 @@ public class AcidUtils {
     HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, 
isAcidTable);
   }
 
-  /** Checks metadata to make sure it's a valid ACID table at metadata level
-   * Three things we will check:
-   * 1. TBLPROPERTIES 'transactional'='true'
-   * 2. The table should be bucketed
-   * 3. InputFormatClass/OutputFormatClass should implement 
AcidInputFormat/AcidOutputFormat
-   *    Currently OrcInputFormat/OrcOutputFormat is the only implementer
+  /** Checks if a table is a valid ACID table.
    * Note, users are responsible for using the correct TxnManager. We do not 
look at
    * SessionState.get().getTxnMgr().supportsAcid() here
    * @param table table
@@ -640,23 +635,7 @@ public class AcidUtils {
     if (tableIsTransactional == null) {
       tableIsTransactional = 
table.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase());
     }
-    if (tableIsTransactional == null || 
!tableIsTransactional.equalsIgnoreCase("true")) {
-      return false;
-    }
-
-    List<String> bucketCols = table.getBucketCols();
-    if (bucketCols == null || bucketCols.isEmpty()) {
-      return false;
-    }
-
-    Class<? extends InputFormat> inputFormatClass = 
table.getInputFormatClass();
-    Class<? extends OutputFormat> outputFormatClass = 
table.getOutputFormatClass();
-    if (inputFormatClass == null || outputFormatClass == null ||
-        !AcidInputFormat.class.isAssignableFrom(inputFormatClass) ||
-        !AcidOutputFormat.class.isAssignableFrom(outputFormatClass)) {
-      return false;
-    }
 
-    return true;
+    return tableIsTransactional != null && 
tableIsTransactional.equalsIgnoreCase("true");
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/214e4b6f/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index a16a788..44b77e7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -331,14 +331,7 @@ public class TestTxnCommands2 {
 
     // 4. Perform a major compaction
     runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 
'MAJOR'");
-    Worker w = new Worker();
-    w.setThreadId((int) w.getId());
-    w.setHiveConf(hiveConf);
-    AtomicBoolean stop = new AtomicBoolean();
-    AtomicBoolean looped = new AtomicBoolean();
-    stop.set(true);
-    w.init(stop, looped);
-    w.run();
+    runWorker(hiveConf);
     // There should be 1 new directory: base_xxxxxxx.
     // Original bucket files and delta directory should stay until Cleaner 
kicks in.
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
@@ -375,14 +368,7 @@ public class TestTxnCommands2 {
     // Before Cleaner, there should be 5 items:
     // 2 original files, 1 original directory, 1 base directory and 1 delta 
directory
     Assert.assertEquals(5, status.length);
-    Cleaner c = new Cleaner();
-    c.setThreadId((int) c.getId());
-    c.setHiveConf(hiveConf);
-    stop = new AtomicBoolean();
-    looped = new AtomicBoolean();
-    stop.set(true);
-    c.init(stop, looped);
-    c.run();
+    runCleaner(hiveConf);
     // There should be only 1 directory left: base_xxxxxxx.
     // Original bucket files and delta directory should have been cleaned up.
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
@@ -596,7 +582,7 @@ public class TestTxnCommands2 {
     }
     return compactionsByState;
   }
-  private static void runWorker(HiveConf hiveConf) throws MetaException {
+  public static void runWorker(HiveConf hiveConf) throws MetaException {
     AtomicBoolean stop = new AtomicBoolean(true);
     Worker t = new Worker();
     t.setThreadId((int) t.getId());
@@ -605,7 +591,7 @@ public class TestTxnCommands2 {
     t.init(stop, looped);
     t.run();
   }
-  private static void runCleaner(HiveConf hiveConf) throws MetaException {
+  public static void runCleaner(HiveConf hiveConf) throws MetaException {
     AtomicBoolean stop = new AtomicBoolean(true);
     Cleaner t = new Cleaner();
     t.setThreadId((int) t.getId());

http://git-wip-us.apache.org/repos/asf/hive/blob/214e4b6f/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 685f42a..5b775f9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -277,6 +277,192 @@ public class TestDbTxnManager2 {
     Assert.assertTrue(cpr.getErrorMessage().contains("Attempt to do update or 
delete using transaction manager that does not support these operations."));
   }
 
+  /**
+   * Normally the compaction process will clean up records in TXN_COMPONENTS, 
COMPLETED_TXN_COMPONENTS,
+   * COMPACTION_QUEUE and COMPLETED_COMPACTIONS. But if a table/partition has 
been dropped before
+   * compaction and there are still relevant records in those metastore 
tables, the Initiator will
+   * complain about not being able to find the table/partition. This method is 
to test and make sure
+   * we clean up relevant records as soon as a table/partition is dropped.
+   *
+   * Note, here we don't need to worry about cleaning up TXNS table, since 
it's handled separately.
+   * @throws Exception
+   */
+  @Test
+  public void testMetastoreTablesCleanup() throws Exception {
+    CommandProcessorResponse cpr = driver.run("create database if not exists 
temp");
+    checkCmdOnDriver(cpr);
+
+    // Create some ACID tables: T10, T11 - unpartitioned table, T12p, T13p - 
partitioned table
+    cpr = driver.run("create table temp.T10 (a int, b int) clustered by(b) 
into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("create table temp.T11 (a int, b int) clustered by(b) 
into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("create table temp.T12p (a int, b int) partitioned by (ds 
string, hour string) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("create table temp.T13p (a int, b int) partitioned by (ds 
string, hour string) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    checkCmdOnDriver(cpr);
+
+    // Successfully insert some data into ACID tables, so that we have records 
in COMPLETED_TXN_COMPONENTS
+    cpr = driver.run("insert into temp.T10 values (1, 1)");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("insert into temp.T10 values (2, 2)");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("insert into temp.T11 values (3, 3)");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("insert into temp.T11 values (4, 4)");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("insert into temp.T12p partition (ds='today', hour='1') 
values (5, 5)");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("insert into temp.T12p partition (ds='tomorrow', 
hour='2') values (6, 6)");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("insert into temp.T13p partition (ds='today', hour='1') 
values (7, 7)");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("insert into temp.T13p partition (ds='tomorrow', 
hour='2') values (8, 8)");
+    checkCmdOnDriver(cpr);
+    int count = TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 
't11')");
+    Assert.assertEquals(4, count);
+    count = TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t12p', 
't13p')");
+    Assert.assertEquals(4, count);
+
+    // Fail some inserts, so that we have records in TXN_COMPONENTS
+    conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+    cpr = driver.run("insert into temp.T10 values (9, 9)");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("insert into temp.T11 values (10, 10)");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("insert into temp.T12p partition (ds='today', hour='1') 
values (11, 11)");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("insert into temp.T13p partition (ds='today', hour='1') 
values (12, 12)");
+    checkCmdOnDriver(cpr);
+    count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS 
where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')");
+    Assert.assertEquals(4, count);
+    conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
+
+    // Drop a table/partition; corresponding records in TXN_COMPONENTS and 
COMPLETED_TXN_COMPONENTS should disappear
+    count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS 
where TC_DATABASE='temp' and TC_TABLE='t10'");
+    Assert.assertEquals(1, count);
+    count = TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t10'");
+    Assert.assertEquals(2, count);
+    cpr = driver.run("drop table temp.T10");
+    checkCmdOnDriver(cpr);
+    count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS 
where TC_DATABASE='temp' and TC_TABLE='t10'");
+    Assert.assertEquals(0, count);
+    count = TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t10'");
+    Assert.assertEquals(0, count);
+
+    count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS 
where TC_DATABASE='temp' and TC_TABLE='t12p' and 
TC_PARTITION='ds=today/hour=1'");
+    Assert.assertEquals(1, count);
+    count = TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t12p' and 
CTC_PARTITION='ds=today/hour=1'");
+    Assert.assertEquals(1, count);
+    cpr = driver.run("alter table temp.T12p drop partition (ds='today', 
hour='1')");
+    checkCmdOnDriver(cpr);
+    count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS 
where TC_DATABASE='temp' and TC_TABLE='t12p' and 
TC_PARTITION='ds=today/hour=1'");
+    Assert.assertEquals(0, count);
+    count = TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE='t12p' and 
CTC_PARTITION='ds=today/hour=1'");
+    Assert.assertEquals(0, count);
+
+    // Successfully perform compaction on a table/partition, so that we have 
successful records in COMPLETED_COMPACTIONS
+    cpr = driver.run("alter table temp.T11 compact 'minor'");
+    checkCmdOnDriver(cpr);
+    count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE 
where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='i'");
+    Assert.assertEquals(1, count);
+    org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf);
+    count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE 
where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='r' and CQ_TYPE='i'");
+    Assert.assertEquals(1, count);
+    org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf);
+    count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE 
where CQ_DATABASE='temp' and CQ_TABLE='t11'");
+    Assert.assertEquals(0, count);
+    count = TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and 
CC_STATE='s' and CC_TYPE='i'");
+    Assert.assertEquals(1, count);
+
+    cpr = driver.run("alter table temp.T12p partition (ds='tomorrow', 
hour='2') compact 'minor'");
+    checkCmdOnDriver(cpr);
+    count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE 
where CQ_DATABASE='temp' and CQ_TABLE='t12p' and 
CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='i'");
+    Assert.assertEquals(1, count);
+    org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf);
+    count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE 
where CQ_DATABASE='temp' and CQ_TABLE='t12p' and 
CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='r' and CQ_TYPE='i'");
+    Assert.assertEquals(1, count);
+    org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner(conf);
+    count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE 
where CQ_DATABASE='temp' and CQ_TABLE='t12p'");
+    Assert.assertEquals(0, count);
+    count = TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and 
CC_STATE='s' and CC_TYPE='i'");
+    Assert.assertEquals(1, count);
+
+    // Fail compaction, so that we have failed records in COMPLETED_COMPACTIONS
+    conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
+    cpr = driver.run("alter table temp.T11 compact 'major'");
+    checkCmdOnDriver(cpr);
+    count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE 
where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'");
+    Assert.assertEquals(1, count);
+    org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail
+    count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE 
where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'");
+    Assert.assertEquals(0, count);
+    count = TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11' and 
CC_STATE='f' and CC_TYPE='a'");
+    Assert.assertEquals(1, count);
+
+    cpr = driver.run("alter table temp.T12p partition (ds='tomorrow', 
hour='2') compact 'major'");
+    checkCmdOnDriver(cpr);
+    count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE 
where CQ_DATABASE='temp' and CQ_TABLE='t12p' and 
CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'");
+    Assert.assertEquals(1, count);
+    org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker(conf); // will fail
+    count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE 
where CQ_DATABASE='temp' and CQ_TABLE='t12p' and 
CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'");
+    Assert.assertEquals(0, count);
+    count = TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and 
CC_STATE='f' and CC_TYPE='a'");
+    Assert.assertEquals(1, count);
+    conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false);
+
+    // Put 2 records into COMPACTION_QUEUE and do nothing
+    cpr = driver.run("alter table temp.T11 compact 'major'");
+    checkCmdOnDriver(cpr);
+    count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE 
where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'");
+    Assert.assertEquals(1, count);
+    cpr = driver.run("alter table temp.T12p partition (ds='tomorrow', 
hour='2') compact 'major'");
+    checkCmdOnDriver(cpr);
+    count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE 
where CQ_DATABASE='temp' and CQ_TABLE='t12p' and 
CQ_PARTITION='ds=tomorrow/hour=2' and CQ_STATE='i' and CQ_TYPE='a'");
+    Assert.assertEquals(1, count);
+
+    // Drop a table/partition, corresponding records in COMPACTION_QUEUE and 
COMPLETED_COMPACTIONS should disappear
+    cpr = driver.run("drop table temp.T11");
+    checkCmdOnDriver(cpr);
+    count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE 
where CQ_DATABASE='temp' and CQ_TABLE='t11'");
+    Assert.assertEquals(0, count);
+    count = TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t11'");
+    Assert.assertEquals(0, count);
+
+    cpr = driver.run("alter table temp.T12p drop partition (ds='tomorrow', 
hour='2')");
+    checkCmdOnDriver(cpr);
+    count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE 
where CQ_DATABASE='temp' and CQ_TABLE='t12p'");
+    Assert.assertEquals(0, count);
+    count = TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p'");
+    Assert.assertEquals(0, count);
+
+    // Put 1 record into COMPACTION_QUEUE and do nothing
+    cpr = driver.run("alter table temp.T13p partition (ds='today', hour='1') 
compact 'major'");
+    checkCmdOnDriver(cpr);
+    count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE 
where CQ_DATABASE='temp' and CQ_TABLE='t13p' and CQ_STATE='i' and CQ_TYPE='a'");
+    Assert.assertEquals(1, count);
+
+    // Drop database, everything in all 4 meta tables should disappear
+    count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS 
where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')");
+    Assert.assertEquals(1, count);
+    count = TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 
't11', 't12p', 't13p')");
+    Assert.assertEquals(2, count);
+    count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE 
where CQ_DATABASE='temp' and CQ_TABLE in ('t10', 't11', 't12p', 't13p')");
+    Assert.assertEquals(1, count);
+    count = TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE in ('t10', 't11', 
't12p', 't13p')");
+    Assert.assertEquals(0, count);
+    cpr = driver.run("drop database if exists temp cascade");
+    checkCmdOnDriver(cpr);
+    count = TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS 
where TC_DATABASE='temp' and TC_TABLE in ('t10', 't11', 't12p', 't13p')");
+    Assert.assertEquals(0, count);
+    count = TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 
't11', 't12p', 't13p')");
+    Assert.assertEquals(0, count);
+    count = TxnDbUtil.countQueryAgent("select count(*) from COMPACTION_QUEUE 
where CQ_DATABASE='temp' and CQ_TABLE in ('t10', 't11', 't12p', 't13p')");
+    Assert.assertEquals(0, count);
+    count = TxnDbUtil.countQueryAgent("select count(*) from 
COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE in ('t10', 't11', 
't12p', 't13p')");
+    Assert.assertEquals(0, count);
+  }
+
   private void checkLock(LockType type, LockState state, String db, String 
table, String partition, ShowLocksResponseElement l) {
     Assert.assertEquals(l.toString(),l.getType(), type);
     Assert.assertEquals(l.toString(),l.getState(), state);

Reply via email to