Repository: hive Updated Branches: refs/heads/branch-3 bf951b7e5 -> c7f713d57
HIVE-18748: Rename table impacts the ACID behavior as table names are not updated in meta-tables. (Eugene Koifman, reviewed by Sankar Hariappan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c7f713d5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c7f713d5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c7f713d5 Branch: refs/heads/branch-3 Commit: c7f713d572ce95d7bbc98892190c1b5ebf2627f3 Parents: bf951b7 Author: Eugene Koifman <ekoif...@apache.org> Authored: Tue May 29 10:45:53 2018 -0700 Committer: Eugene Koifman <ekoif...@apache.org> Committed: Tue May 29 10:45:53 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/TestTxnConcatenate.java | 61 +++++++ .../hive/metastore/AcidEventListener.java | 51 ++++++ .../hadoop/hive/metastore/HiveAlterHandler.java | 9 + .../hadoop/hive/metastore/txn/TxnHandler.java | 177 +++++++++++++++++++ .../hadoop/hive/metastore/txn/TxnStore.java | 5 + 5 files changed, 303 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c7f713d5/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java index 2663fec..511198a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java @@ -21,8 +21,11 @@ package org.apache.hadoop.hive.ql; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -174,4 +177,62 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests { "t/base_0000002/000000_0"}}; checkResult(expected2, testQuery, false, "check data after concatenate", LOG); } + @Test + public void testRenameTable() throws Exception { + MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); + runStatementOnDriver("drop database if exists mydb1 cascade"); + runStatementOnDriver("drop database if exists mydb2 cascade"); + runStatementOnDriver("create database mydb1"); + runStatementOnDriver("create database mydb2"); + runStatementOnDriver("create table mydb1.T(a int, b int) stored as orc"); + runStatementOnDriver("insert into mydb1.T values(1,2),(4,5)"); + //put something in WRITE_SET + runStatementOnDriver("update mydb1.T set b = 6 where b = 5"); + runStatementOnDriver("alter table mydb1.T compact 'minor'"); + + runStatementOnDriver("alter table mydb1.T RENAME TO mydb1.S"); + + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from mydb1.S"; + String[][] expected = new String[][] { + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", + "s/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6", + "s/delta_0000002_0000002_0000/bucket_00000"}}; + checkResult(expected, testQuery, false, "check data", LOG); + + + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPACTION_QUEUE where CQ_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from WRITE_SET where WS_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='t'")); + Assert.assertEquals(0, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from NEXT_WRITE_ID where NWI_TABLE='t'")); + + Assert.assertEquals( + TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), 2, + TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from COMPACTION_QUEUE where CQ_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from WRITE_SET where WS_TABLE='s'")); + Assert.assertEquals(2, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='s'")); + Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from NEXT_WRITE_ID where NWI_TABLE='s'")); + + //this causes MetaStoreEvenListener.onDropTable()/onCreateTable() to execute and the data + //files are just moved under new table. This can't work since a drop table in Acid removes + //the relevant table metadata (like writeid, etc.), so writeIds in file names/ROW_IDs + //no longer make sense. (In fact 'select ...' returns nothing since there is no NEXT_WRITE_ID + //entry for the 'new' table and all existing data is 'above HWM'. see HIVE-19569 + CommandProcessorResponse cpr = + runStatementOnDriverNegative("alter table mydb1.S RENAME TO mydb2.bar"); + Assert.assertTrue(cpr.getErrorMessage() != null && cpr.getErrorMessage() + .contains("Changing database name of a transactional table mydb1.s is not supported.")); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/c7f713d5/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java index f849b1a..5279247 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java @@ -19,10 +19,16 @@ package org.apache.hadoop.hive.metastore; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.HiveObjectType; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent; +import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; @@ -69,6 +75,51 @@ public class AcidEventListener extends MetaStoreEventListener { } } + @Override + public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { + if (!TxnUtils.isTransactionalTable(tableEvent.getNewTable())) { + return; + } + Table oldTable = tableEvent.getOldTable(); + Table newTable = tableEvent.getNewTable(); + if(!oldTable.getCatName().equalsIgnoreCase(newTable.getCatName()) || + !oldTable.getDbName().equalsIgnoreCase(newTable.getDbName()) || + !oldTable.getTableName().equalsIgnoreCase(newTable.getTableName())) { + txnHandler = getTxnHandler(); + txnHandler.onRename( + oldTable.getCatName(), oldTable.getDbName(), oldTable.getTableName(), null, + newTable.getCatName(), newTable.getDbName(), newTable.getTableName(), null); + } + } + @Override + public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException { + if (!TxnUtils.isTransactionalTable(partitionEvent.getTable())) { + return; + } + Partition oldPart = partitionEvent.getOldPartition(); + Partition newPart = partitionEvent.getNewPartition(); + Table t = partitionEvent.getTable(); + String oldPartName = Warehouse.makePartName(t.getPartitionKeys(), oldPart.getValues()); + String newPartName = Warehouse.makePartName(t.getPartitionKeys(), newPart.getValues()); + if(!oldPartName.equals(newPartName)) { + txnHandler = getTxnHandler(); + txnHandler.onRename(t.getCatName(), t.getDbName(), t.getTableName(), oldPartName, + t.getCatName(), t.getDbName(), t.getTableName(), newPartName); + } + } + @Override + public void onAlterDatabase(AlterDatabaseEvent dbEvent) throws MetaException { + Database oldDb = dbEvent.getOldDatabase(); + Database newDb = dbEvent.getNewDatabase(); + if(!oldDb.getCatalogName().equalsIgnoreCase(newDb.getCatalogName()) || + !oldDb.getName().equalsIgnoreCase(newDb.getName())) { + txnHandler = getTxnHandler(); + txnHandler.onRename( + oldDb.getCatalogName(), oldDb.getName(), null, null, + newDb.getCatalogName(), newDb.getName(), null, null); + } + } + private TxnStore getTxnHandler() { boolean hackOn = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST); http://git-wip-us.apache.org/repos/asf/hive/blob/c7f713d5/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 0be0aaa..9ab9e85 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -421,6 +421,15 @@ public class HiveAlterHandler implements AlterHandler { new AlterTableEvent(oldt, newt, false, success, handler), environmentContext, txnAlterTableEventResponses, msdb); } else { + if(oldt.getParameters() != null && "true".equalsIgnoreCase( + oldt.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL))) { + /*Why does it split Alter into Drop + Create here????? This causes onDropTable logic + * to wipe out acid related metadata and writeIds from old table don't make sense + * in the new table.*/ + throw new IllegalStateException("Changing database name of a transactional table " + + Warehouse.getQualifiedName(oldt) + " is not supported. Please use create-table-as" + + " or create new table manually followed by Insert."); + } MetaStoreListenerNotifier.notifyEvent(listeners, EventMessage.EventType.DROP_TABLE, new DropTableEvent(oldt, true, false, handler), environmentContext, txnDropTableEventResponses, msdb); http://git-wip-us.apache.org/repos/asf/hive/blob/c7f713d5/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index c513b4d..21b9865 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -2566,6 +2566,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * specifically: TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS * Retry-by-caller note: this is only idempotent assuming it's only called by dropTable/Db/etc * operations. + * + * HIVE_LOCKS is (presumably) expected to be removed by AcidHouseKeeperServices + * WS_SET is (presumably) expected to be removed by AcidWriteSetService */ @Override @RetrySemantics.Idempotent @@ -2760,7 +2763,181 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { cleanupRecords(type, db, table, partitionIterator); } } + /** + * Catalog hasn't been added to transactional tables yet, so it's passed in but not used. + */ + @Override + public void onRename(String oldCatName, String oldDbName, String oldTabName, String oldPartName, + String newCatName, String newDbName, String newTabName, String newPartName) + throws MetaException { + String callSig = "onRename(" + + oldCatName + "," + oldDbName + "," + oldTabName + "," + oldPartName + "," + + newCatName + "," + newDbName + "," + newTabName + "," + newPartName + ")"; + + if(newPartName != null) { + assert oldPartName != null && oldTabName != null && oldDbName != null && oldCatName != null : + callSig; + } + if(newTabName != null) { + assert oldTabName != null && oldDbName != null && oldCatName != null : callSig; + } + if(newDbName != null) { + assert oldDbName != null && oldCatName != null : callSig; + } + + try { + Connection dbConn = null; + Statement stmt = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + List<String> queries = new ArrayList<>(); + + String update = "update TXN_COMPONENTS set "; + String where = " where "; + if(oldPartName != null) { + update += "TC_PARTITION = " + quoteString(newPartName) + ", "; + where += "TC_PARTITION = " + quoteString(oldPartName) + " AND "; + } + if(oldTabName != null) { + update += "TC_TABLE = " + quoteString(normalizeCase(newTabName)) + ", "; + where += "TC_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND "; + } + if(oldDbName != null) { + update += "TC_DATABASE = " + quoteString(normalizeCase(newDbName)); + where += "TC_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + queries.add(update + where); + + update = "update COMPLETED_TXN_COMPONENTS set "; + where = " where "; + if(oldPartName != null) { + update += "CTC_PARTITION = " + quoteString(newPartName) + ", "; + where += "CTC_PARTITION = " + quoteString(oldPartName) + " AND "; + } + if(oldTabName != null) { + update += "CTC_TABLE = " + quoteString(normalizeCase(newTabName)) + ", "; + where += "CTC_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND "; + } + if(oldDbName != null) { + update += "CTC_DATABASE = " + quoteString(normalizeCase(newDbName)); + where += "CTC_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + queries.add(update + where); + + update = "update HIVE_LOCKS set "; + where = " where "; + if(oldPartName != null) { + update += "HL_PARTITION = " + quoteString(newPartName) + ", "; + where += "HL_PARTITION = " + quoteString(oldPartName) + " AND "; + } + if(oldTabName != null) { + update += "HL_TABLE = " + quoteString(normalizeCase(newTabName)) + ", "; + where += "HL_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND "; + } + if(oldDbName != null) { + update += "HL_DB = " + quoteString(normalizeCase(newDbName)); + where += "HL_DB = " + quoteString(normalizeCase(oldDbName)); + } + queries.add(update + where); + + update = "update COMPACTION_QUEUE set "; + where = " where "; + if(oldPartName != null) { + update += "CQ_PARTITION = " + quoteString(newPartName) + ", "; + where += "CQ_PARTITION = " + quoteString(oldPartName) + " AND "; + } + if(oldTabName != null) { + update += "CQ_TABLE = " + quoteString(normalizeCase(newTabName)) + ", "; + where += "CQ_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND "; + } + if(oldDbName != null) { + update += "CQ_DATABASE = " + quoteString(normalizeCase(newDbName)); + where += "CQ_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + queries.add(update + where); + update = "update COMPLETED_COMPACTIONS set "; + where = " where "; + if(oldPartName != null) { + update += "CC_PARTITION = " + quoteString(newPartName) + ", "; + where += "CC_PARTITION = " + quoteString(oldPartName) + " AND "; + } + if(oldTabName != null) { + update += "CC_TABLE = " + quoteString(normalizeCase(newTabName)) + ", "; + where += "CC_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND "; + } + if(oldDbName != null) { + update += "CC_DATABASE = " + quoteString(normalizeCase(newDbName)); + where += "CC_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + queries.add(update + where); + + update = "update WRITE_SET set "; + where = " where "; + if(oldPartName != null) { + update += "WS_PARTITION = " + quoteString(newPartName) + ", "; + where += "WS_PARTITION = " + quoteString(oldPartName) + " AND "; + } + if(oldTabName != null) { + update += "WS_TABLE = " + quoteString(normalizeCase(newTabName)) + ", "; + where += "WS_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND "; + } + if(oldDbName != null) { + update += "WS_DATABASE = " + quoteString(normalizeCase(newDbName)); + where += "WS_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + queries.add(update + where); + + update = "update TXN_TO_WRITE_ID set "; + where = " where "; + if(oldTabName != null) { + update += "T2W_TABLE = " + quoteString(normalizeCase(newTabName)) + ", "; + where += "T2W_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND "; + } + if(oldDbName != null) { + update += "T2W_DATABASE = " + quoteString(normalizeCase(newDbName)); + where += "T2W_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + queries.add(update + where); + + update = "update NEXT_WRITE_ID set "; + where = " where "; + if(oldTabName != null) { + update += "NWI_TABLE = " + quoteString(normalizeCase(newTabName)) + ", "; + where += "NWI_TABLE = " + quoteString(normalizeCase(oldTabName)) + " AND "; + } + if(oldDbName != null) { + update += "NWI_DATABASE = " + quoteString(normalizeCase(newDbName)); + where += "NWI_DATABASE = " + quoteString(normalizeCase(oldDbName)); + } + queries.add(update + where); + + for (String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + stmt.executeUpdate(query); + } + + LOG.debug("Going to commit: " + callSig); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback: " + callSig); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, callSig); + if (e.getMessage().contains("does not exist")) { + LOG.warn("Cannot perform " + callSig + " since metastore table does not exist"); + } else { + throw new MetaException("Unable to " + callSig + ":" + StringUtils.stringifyException(e)); + } + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + } catch (RetryException e) { + onRename(oldCatName, oldDbName, oldTabName, oldPartName, + newCatName, newDbName, newTabName, newPartName); + } + } /** * For testing only, do not use. */ http://git-wip-us.apache.org/repos/asf/hive/blob/c7f713d5/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index b8e398f..4695f0d 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -266,6 +266,11 @@ public interface TxnStore extends Configurable { void cleanupRecords(HiveObjectType type, Database db, Table table, Iterator<Partition> partitionIterator) throws MetaException; + @RetrySemantics.Idempotent + void onRename(String oldCatName, String oldDbName, String oldTabName, String oldPartName, + String newCatName, String newDbName, String newTabName, String newPartName) + throws MetaException; + /** * Timeout transactions and/or locks. This should only be called by the compactor. */