Repository: hive Updated Branches: refs/heads/master 83e15d824 -> 9d23f7185
HIVE-19488: Enable CM root based on db parameter, identifying a db as source of replication (Mahesh Kumar Behera, reviewed by Sankar Hariappan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9d23f718 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9d23f718 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9d23f718 Branch: refs/heads/master Commit: 9d23f7185df576f7258e78c423455276972570d9 Parents: 83e15d8 Author: Sankar Hariappan <sank...@apache.org> Authored: Tue May 29 12:15:17 2018 +0530 Committer: Sankar Hariappan <sank...@apache.org> Committed: Tue May 29 12:15:17 2018 +0530 ---------------------------------------------------------------------- .../hive/metastore/TestReplChangeManager.java | 3 + .../hadoop/hive/ql/parse/TestCopyUtils.java | 4 +- .../TestReplicationOnHDFSEncryptedZones.java | 4 +- .../hive/ql/parse/TestReplicationScenarios.java | 90 ++++++++++-- .../TestReplicationScenariosAcidTables.java | 4 +- ...TestReplicationScenariosAcrossInstances.java | 18 ++- .../TestHiveAuthorizerCheckInvocation.java | 4 +- .../compactor/TestCleanerWithReplication.java | 34 +++-- .../apache/hive/jdbc/TestJdbcWithMiniHS2.java | 4 +- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 8 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 9 +- .../hive/ql/parse/MetaDataExportListener.java | 6 +- .../ql/parse/ReplicationSemanticAnalyzer.java | 23 ++- .../hadoop/hive/ql/txn/compactor/Cleaner.java | 10 +- .../clientnegative/repl_dump_requires_admin.q | 2 +- .../clientnegative/repl_load_requires_admin.q | 2 +- .../repl_dump_requires_admin.q.out | 4 +- .../repl_load_requires_admin.q.out | 4 +- .../hadoop/hive/metastore/HiveAlterHandler.java | 11 +- .../hadoop/hive/metastore/HiveMetaStore.java | 145 +++++++++++-------- .../hive/metastore/ReplChangeManager.java | 23 +++ .../apache/hadoop/hive/metastore/Warehouse.java | 10 +- .../hadoop/hive/metastore/model/MDatabase.java | 21 ++- 23 files changed, 317 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java index e63250c..235bd11 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager.RecycleType; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; @@ -119,6 +120,7 @@ public class TestReplChangeManager { client.dropDatabase(dbName, true, true); Database db = new Database(); + db.putToParameters(SOURCE_OF_REPLICATION, "1,2,3"); db.setName(dbName); client.createDatabase(db); @@ -204,6 +206,7 @@ public class TestReplChangeManager { client.dropDatabase(dbName, true, true); Database db = new Database(); + db.putToParameters(SOURCE_OF_REPLICATION, "1, 2, 3"); db.setName(dbName); client.createDatabase(db); http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java index f14b430..0e0a5cc 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java @@ -42,6 +42,7 @@ import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.HadoopShims.MiniMrShim; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; public class TestCopyUtils { @Rule @@ -110,7 +111,8 @@ public class TestCopyUtils { replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>()); primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); replicatedDbName = "replicated_" + primaryDbName; - primary.run("create database " + primaryDbName); + primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java index 73102a7..7557280 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java @@ -39,6 +39,7 @@ import java.io.IOException; import java.util.HashMap; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_ENABLED; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; public class TestReplicationOnHDFSEncryptedZones { private static String jksFile = System.getProperty("java.io.tmpdir") + "/test.jks"; @@ -83,7 +84,8 @@ public class TestReplicationOnHDFSEncryptedZones { public void setup() throws Throwable { primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); replicatedDbName = "replicated_" + primaryDbName; - primary.run("create database " + primaryDbName); + primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); } @Test http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 8b33b78..f4cdf02 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -89,6 +89,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; public class TestReplicationScenarios { @@ -379,7 +380,7 @@ public class TestReplicationScenarios { verifySetup("SELECT * from " + dbName + ".unptned ORDER BY a", unptn_data, driver); // Create an empty database to load - run("CREATE DATABASE " + dbName + "_empty", driverMirror); + createDB(dbName + "_empty", driverMirror); // Load to an empty database Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, dbName + "_empty"); @@ -389,7 +390,7 @@ public class TestReplicationScenarios { String[] nullReplId = new String[]{ "NULL" }; // Create a database with a table - run("CREATE DATABASE " + dbName + "_withtable", driverMirror); + createDB(dbName + "_withtable", driverMirror); run("CREATE TABLE " + dbName + "_withtable.unptned(a string) STORED AS TEXTFILE", driverMirror); // Load using same dump to a DB with table. It should fail as DB is not empty. verifyFail("REPL LOAD " + dbName + "_withtable FROM '" + replDumpLocn + "'", driverMirror); @@ -398,7 +399,7 @@ public class TestReplicationScenarios { verifyRun("REPL STATUS " + dbName + "_withtable", nullReplId, driverMirror); // Create a database with a view - run("CREATE DATABASE " + dbName + "_withview", driverMirror); + createDB(dbName + "_withview", driverMirror); run("CREATE TABLE " + dbName + "_withview.unptned(a string) STORED AS TEXTFILE", driverMirror); run("CREATE VIEW " + dbName + "_withview.view AS SELECT * FROM " + dbName + "_withview.unptned", driverMirror); // Load using same dump to a DB with view. It should fail as DB is not empty. @@ -1893,8 +1894,8 @@ public class TestReplicationScenarios { String replDbName1 = dbName1 + "_dupe"; String replDbName2 = dbName2 + "_dupe"; - run("CREATE DATABASE " + dbName1, driver); - run("CREATE DATABASE " + dbName2, driver); + createDB(dbName1, driver); + createDB(dbName2, driver); run("CREATE TABLE " + dbName1 + ".unptned(a string) STORED AS TEXTFILE", driver); String[] unptn_data = new String[] { "ten", "twenty" }; @@ -1909,14 +1910,14 @@ public class TestReplicationScenarios { verifyRun("SELECT a from " + replDbName1 + ".unptned ORDER BY a", unptn_data, driverMirror); verifyIfTableNotExist(replDbName2, "unptned", metaStoreClientMirror); - run("ALTER TABLE " + dbName1 + ".unptned RENAME TO " + dbName2 + ".unptned_renamed", driver); + verifyFail("ALTER TABLE " + dbName1 + ".unptned RENAME TO " + dbName2 + ".unptned_renamed", driver); incrementalLoadAndVerify(dbName1, bootstrap1.lastReplId, replDbName1); incrementalLoadAndVerify(dbName2, bootstrap2.lastReplId, replDbName2); - verifyIfTableNotExist(replDbName1, "unptned", metaStoreClientMirror); verifyIfTableNotExist(replDbName1, "unptned_renamed", metaStoreClientMirror); - verifyRun("SELECT a from " + replDbName2 + ".unptned_renamed ORDER BY a", unptn_data, driverMirror); + verifyIfTableNotExist(replDbName2, "unptned_renamed", metaStoreClientMirror); + verifyRun("SELECT a from " + replDbName1 + ".unptned ORDER BY a", unptn_data, driverMirror); } @Test @@ -1928,8 +1929,8 @@ public class TestReplicationScenarios { String replDbName1 = dbName1 + "_dupe"; String replDbName2 = dbName2 + "_dupe"; - run("CREATE DATABASE " + dbName1, driver); - run("CREATE DATABASE " + dbName2, driver); + createDB(dbName1, driver); + createDB(dbName2, driver); run("CREATE TABLE " + dbName1 + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); String[] ptn_data = new String[] { "fifteen", "fourteen" }; @@ -1944,14 +1945,14 @@ public class TestReplicationScenarios { verifyRun("SELECT a from " + replDbName1 + ".ptned where (b=1) ORDER BY a", ptn_data, driverMirror); verifyIfTableNotExist(replDbName2, "ptned", metaStoreClientMirror); - run("ALTER TABLE " + dbName1 + ".ptned RENAME TO " + dbName2 + ".ptned_renamed", driver); + verifyFail("ALTER TABLE " + dbName1 + ".ptned RENAME TO " + dbName2 + ".ptned_renamed", driver); incrementalLoadAndVerify(dbName1, bootstrap1.lastReplId, replDbName1); incrementalLoadAndVerify(dbName2, bootstrap2.lastReplId, replDbName2); - verifyIfTableNotExist(replDbName1, "ptned", metaStoreClientMirror); verifyIfTableNotExist(replDbName1, "ptned_renamed", metaStoreClientMirror); - verifyRun("SELECT a from " + replDbName2 + ".ptned_renamed where (b=1) ORDER BY a", ptn_data, driverMirror); + verifyIfTableNotExist(replDbName2, "ptned_renamed", metaStoreClientMirror); + verifyRun("SELECT a from " + replDbName1 + ".ptned where (b=1) ORDER BY a", ptn_data, driverMirror); } @Test @@ -3076,7 +3077,7 @@ public class TestReplicationScenarios { // Setup long firstEventId = metaStoreClient.getCurrentNotificationEventId().getEventId(); String dbName = "testAuthForNotificationAPIs"; - driver.run("create database " + dbName); + createDB(dbName, driver); NotificationEventResponse rsp = metaStoreClient.getNextNotification(firstEventId, 0, null); assertEquals(1, rsp.getEventsSize()); // Test various scenarios @@ -3156,8 +3157,69 @@ public class TestReplicationScenarios { assertTrue(fileCount == fileCountAfter); } + @Test + public void testDumpNonReplDatabase() throws IOException { + String dbName = createDBNonRepl(testName.getMethodName(), driver); + verifyFail("REPL DUMP " + dbName, driver); + verifyFail("REPL DUMP " + dbName + " from 1 ", driver); + run("alter database " + dbName + " set dbproperties ('repl.source.for' = '1, 2, 3')", driver); + assertTrue(run("REPL DUMP " + dbName, true, driver)); + assertTrue(run("REPL DUMP " + dbName + " from 1 ", true, driver)); + dbName = createDBNonRepl(testName.getMethodName() + "_case", driver); + run("alter database " + dbName + " set dbproperties ('repl.SOURCE.for' = '1, 2, 3')", driver); + assertTrue(run("REPL DUMP " + dbName, true, driver)); + assertTrue(run("REPL DUMP " + dbName + " from 1 ", true, driver)); + } + + @Test + public void testRecycleFileNonReplDatabase() throws IOException { + String dbName = createDBNonRepl(testName.getMethodName(), driver); + + String cmDir = hconf.getVar(HiveConf.ConfVars.REPLCMDIR); + Path path = new Path(cmDir); + FileSystem fs = path.getFileSystem(hconf); + ContentSummary cs = fs.getContentSummary(path); + long fileCount = cs.getFileCount(); + + run("CREATE TABLE " + dbName + ".normal(a int)", driver); + run("INSERT INTO " + dbName + ".normal values (1)", driver); + + cs = fs.getContentSummary(path); + long fileCountAfter = cs.getFileCount(); + assertTrue(fileCount == fileCountAfter); + + run("INSERT INTO " + dbName + ".normal values (3)", driver); + run("TRUNCATE TABLE " + dbName + ".normal", driver); + + cs = fs.getContentSummary(path); + fileCountAfter = cs.getFileCount(); + assertTrue(fileCount == fileCountAfter); + + run("INSERT INTO " + dbName + ".normal values (4)", driver); + run("ALTER TABLE " + dbName + ".normal RENAME to " + dbName + ".normal1", driver); + verifyRun("SELECT count(*) from " + dbName + ".normal1", new String[]{"1"}, driver); + + cs = fs.getContentSummary(path); + fileCountAfter = cs.getFileCount(); + assertTrue(fileCount == fileCountAfter); + + run("INSERT INTO " + dbName + ".normal1 values (5)", driver); + run("DROP TABLE " + dbName + ".normal1", driver); + + cs = fs.getContentSummary(path); + fileCountAfter = cs.getFileCount(); + assertTrue(fileCount == fileCountAfter); + } + private static String createDB(String name, IDriver myDriver) { LOG.info("Testing " + name); + run("CREATE DATABASE " + name + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')", myDriver); + return name; + } + + private static String createDBNonRepl(String name, IDriver myDriver) { + LOG.info("Testing " + name); String dbName = name + "_" + tid; run("CREATE DATABASE " + dbName, myDriver); return dbName; http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 8ad507f..9a2d296 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.shims.Utils; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import org.junit.rules.TestName; import org.junit.rules.TestRule; import org.slf4j.Logger; @@ -97,7 +98,8 @@ public class TestReplicationScenariosAcidTables { replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>()); primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); replicatedDbName = "replicated_" + primaryDbName; - primary.run("create database " + primaryDbName); + primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); } @After http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index bcbf113..182a772 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -60,6 +60,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; public class TestReplicationScenariosAcrossInstances { @Rule @@ -98,7 +99,8 @@ public class TestReplicationScenariosAcrossInstances { replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>()); primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); replicatedDbName = "replicated_" + primaryDbName; - primary.run("create database " + primaryDbName); + primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); } @After @@ -402,17 +404,20 @@ public class TestReplicationScenariosAcrossInstances { String randomTwo = RandomStringUtils.random(10, true, false); String dbOne = primaryDbName + randomOne; String dbTwo = primaryDbName + randomTwo; + primary.run("alter database default set dbproperties ('repl.source.for' = '1, 2, 3')"); WarehouseInstance.Tuple tuple = primary .run("use " + primaryDbName) .run("create table t1 (i int, j int)") - .run("create database " + dbOne) + .run("create database " + dbOne + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')") .run("use " + dbOne) // TODO: this is wrong; this test sets up dummy txn manager and so it cannot create ACID tables. // This used to work by accident, now this works due a test flag. The test needs to be fixed. // Also applies for a couple more tests. .run("create table t1 (i int, j int) partitioned by (load_date date) " + "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ") - .run("create database " + dbTwo) + .run("create database " + dbTwo + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')") .run("use " + dbTwo) .run("create table t1 (i int, j int)") .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'", @@ -463,10 +468,12 @@ public class TestReplicationScenariosAcrossInstances { String randomOne = RandomStringUtils.random(10, true, false); String randomTwo = RandomStringUtils.random(10, true, false); String dbOne = primaryDbName + randomOne; + primary.run("alter database default set dbproperties ('repl.source.for' = '1, 2, 3')"); WarehouseInstance.Tuple bootstrapTuple = primary .run("use " + primaryDbName) .run("create table t1 (i int, j int)") - .run("create database " + dbOne) + .run("create database " + dbOne + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')") .run("use " + dbOne) .run("create table t1 (i int, j int) partitioned by (load_date date) " + "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ") @@ -475,7 +482,8 @@ public class TestReplicationScenariosAcrossInstances { String dbTwo = primaryDbName + randomTwo; WarehouseInstance.Tuple incrementalTuple = primary - .run("create database " + dbTwo) + .run("create database " + dbTwo + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')") .run("use " + dbTwo) .run("create table t1 (i int, j int)") .run("use " + dbOne) http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java index a3cdd6e..e3c83d2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/security/authorization/plugin/TestHiveAuthorizerCheckInvocation.java @@ -53,6 +53,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; /** * Test HiveAuthorizer api invocation @@ -104,7 +105,8 @@ public class TestHiveAuthorizerCheckInvocation { runCmd("create table " + tableName + " (i int, j int, k string) partitioned by (city string, `date` string) "); runCmd("create view " + viewName + " as select * from " + tableName); - runCmd("create database " + dbName); + runCmd("create database " + dbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); runCmd("create table " + fullInTableName + "(i int)"); // Need a separate table for ACID testing since it has to be bucketed and it has to be Acid runCmd("create table " + acidTableName + " (i int, j int, k int) clustered by (k) into 2 buckets " + http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java index 597544f..14d3894 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.Table; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -52,6 +54,7 @@ public class TestCleanerWithReplication extends CompactorTest { private Path cmRootDirectory; private static FileSystem fs; private static MiniDFSCluster miniDFSCluster; + private final String dbName = "TestCleanerWithReplication"; @Before public void setup() throws Exception { @@ -68,6 +71,10 @@ public class TestCleanerWithReplication extends CompactorTest { fs.mkdirs(cmRootDirectory); } tmpdir = new File(Files.createTempDirectory("compactor_test_table_").toString()); + Database db = new Database(); + db.putToParameters(SOURCE_OF_REPLICATION, "1,2,3"); + db.setName(dbName); + ms.createDatabase(db); } @BeforeClass @@ -81,9 +88,10 @@ public class TestCleanerWithReplication extends CompactorTest { } @After - public void tearDown() throws IOException { + public void tearDown() throws Exception { fs.delete(cmRootDirectory, true); compactorTestCleanup(); + ms.dropDatabase(dbName, true, true, true); } @AfterClass @@ -93,16 +101,16 @@ public class TestCleanerWithReplication extends CompactorTest { @Test public void cleanupAfterMajorTableCompaction() throws Exception { - Table t = newTable("default", "camtc", false); + Table t = newTable(dbName, "camtc", false); addBaseFile(t, null, 20L, 20); addDeltaFile(t, null, 21L, 22L, 2); addDeltaFile(t, null, 23L, 24L, 2); addBaseFile(t, null, 25L, 25); - burnThroughTransactions("default", "camtc", 25); + burnThroughTransactions(dbName, "camtc", 25); - CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); + CompactionRequest rqst = new CompactionRequest(dbName, "camtc", CompactionType.MAJOR); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); txnHandler.markCompacted(ci); @@ -113,7 +121,7 @@ public class TestCleanerWithReplication extends CompactorTest { @Test public void cleanupAfterMajorPartitionCompaction() throws Exception { - Table t = newTable("default", "campc", true); + Table t = newTable(dbName, "campc", true); Partition p = newPartition(t, "today"); addBaseFile(t, p, 20L, 20); @@ -121,9 +129,9 @@ public class TestCleanerWithReplication extends CompactorTest { addDeltaFile(t, p, 23L, 24L, 2); addBaseFile(t, p, 25L, 25); - burnThroughTransactions("default", "campc", 25); + burnThroughTransactions(dbName, "campc", 25); - CompactionRequest rqst = new CompactionRequest("default", "campc", CompactionType.MAJOR); + CompactionRequest rqst = new CompactionRequest(dbName, "campc", CompactionType.MAJOR); rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); @@ -135,16 +143,16 @@ public class TestCleanerWithReplication extends CompactorTest { @Test public void cleanupAfterMinorTableCompaction() throws Exception { - Table t = newTable("default", "camitc", false); + Table t = newTable(dbName, "camitc", false); addBaseFile(t, null, 20L, 20); addDeltaFile(t, null, 21L, 22L, 2); addDeltaFile(t, null, 23L, 24L, 2); addDeltaFile(t, null, 21L, 24L, 4); - burnThroughTransactions("default", "camitc", 25); + burnThroughTransactions(dbName, "camitc", 25); - CompactionRequest rqst = new CompactionRequest("default", "camitc", CompactionType.MINOR); + CompactionRequest rqst = new CompactionRequest(dbName, "camitc", CompactionType.MINOR); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); txnHandler.markCompacted(ci); @@ -155,7 +163,7 @@ public class TestCleanerWithReplication extends CompactorTest { @Test public void cleanupAfterMinorPartitionCompaction() throws Exception { - Table t = newTable("default", "camipc", true); + Table t = newTable(dbName, "camipc", true); Partition p = newPartition(t, "today"); addBaseFile(t, p, 20L, 20); @@ -163,9 +171,9 @@ public class TestCleanerWithReplication extends CompactorTest { addDeltaFile(t, p, 23L, 24L, 2); addDeltaFile(t, p, 21L, 24L, 4); - burnThroughTransactions("default", "camipc", 25); + burnThroughTransactions(dbName, "camipc", 25); - CompactionRequest rqst = new CompactionRequest("default", "camipc", CompactionType.MINOR); + CompactionRequest rqst = new CompactionRequest(dbName, "camipc", CompactionType.MINOR); rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); CompactionInfo ci = txnHandler.findNextToCompact("fred"); http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index a5c41bc..d7d7097 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -84,6 +84,7 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; public class TestJdbcWithMiniHS2 { private static MiniHS2 miniHS2 = null; @@ -120,7 +121,8 @@ public class TestJdbcWithMiniHS2 { } Statement stmt = conDefault.createStatement(); stmt.execute("drop database if exists " + testDbName + " cascade"); - stmt.execute("create database " + testDbName); + stmt.execute("create database " + testDbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); stmt.close(); try { http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 16d0854..e069499 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -1587,10 +1587,10 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } } - private void deleteDir(Path dir) throws HiveException { + private void deleteDir(Path dir, Database db) throws HiveException { try { Warehouse wh = new Warehouse(conf); - wh.deleteDir(dir, true); + wh.deleteDir(dir, true, db); } catch (MetaException e) { throw new HiveException(e); } @@ -1845,7 +1845,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { // If a failure occurs here, the directory containing the original files // will not be deleted. The user will run ARCHIVE again to clear this up if(pathExists(intermediateOriginalDir)) { - deleteDir(intermediateOriginalDir); + deleteDir(intermediateOriginalDir, db.getDatabase(tbl.getDbName())); } if(recovery) { @@ -2051,7 +2051,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { // If a failure happens here, the intermediate archive files won't be // deleted. The user will need to call unarchive again to clear those up. if(pathExists(intermediateArchivedDir)) { - deleteDir(intermediateArchivedDir); + deleteDir(intermediateArchivedDir, db.getDatabase(tbl.getDbName())); } if(recovery) { http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index e8554f9..107d032 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -112,6 +112,7 @@ import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.CheckConstraintsRequest; @@ -1737,8 +1738,10 @@ public class Hive { //for fullAcid tables we don't delete files for commands with OVERWRITE - we create a new // base_x. (there is Insert Overwrite and Load Data Overwrite) boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); + boolean needRecycle = !tbl.isTemporary() + && ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName())); replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal, - isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary()); + isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle); } else { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation, @@ -2304,8 +2307,10 @@ private void constructOneLBLocationMap(FileStatus fSta, if (loadFileType == LoadFileType.REPLACE_ALL && !isTxnTable) { //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361 boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); + boolean needRecycle = !tbl.isTemporary() + && ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName())); replaceFiles(tblPath, loadPath, destPath, tblPath, conf, isSrcLocal, isAutopurge, - newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, !tbl.isTemporary()); + newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle); } else { try { FileSystem fs = tbl.getDataLocation().getFileSystem(conf); http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java index 8fccf36..b6d8a28 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java @@ -90,11 +90,9 @@ public class MetaDataExportListener extends MetaStorePreEventListener { EximUtil.createExportDump(fs, outFile, mTbl, null, null, new HiveConf(conf, MetaDataExportListener.class)); if (moveMetadataToTrash == true) { - wh.deleteDir(metaPath, true); + wh.deleteDir(metaPath, true, false, false); } - } catch (IOException e) { - throw new MetaException(e.getMessage()); - } catch (SemanticException e) { + } catch (IOException | SemanticException e) { throw new MetaException(e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 562f497..d7b3104 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -23,6 +23,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; @@ -105,7 +107,11 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { switch (ast.getToken().getType()) { case TOK_REPL_DUMP: { LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: dump"); - initReplDump(ast); + try { + initReplDump(ast); + } catch (HiveException e) { + throw new SemanticException("repl dump failed " + e.getMessage()); + } analyzeReplDump(ast); break; } @@ -127,9 +133,22 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } } - private void initReplDump(ASTNode ast) { + private void initReplDump(ASTNode ast) throws HiveException { int numChildren = ast.getChildCount(); dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText()); + + for (String dbName : Utils.matchesDb(db, dbNameOrPattern)) { + Database database = db.getDatabase(dbName); + if (database != null) { + if (!ReplChangeManager.isSourceOfReplication(database)) { + throw new SemanticException("Cannot dump database " + dbNameOrPattern + + " as it is not a source of replication"); + } + } else { + throw new SemanticException("Cannot dump database " + dbNameOrPattern + " as it does not exist"); + } + } + // skip the first node, which is always required int currNode = 1; while (currNode < numChildren) { http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index fe6d2d6..3565616 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; @@ -35,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -320,7 +323,7 @@ public class Cleaner extends CompactorThread { return " id=" + ci.id; } private void removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci) - throws IOException { + throws IOException, HiveException { Path locPath = new Path(location); AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList); List<FileStatus> obsoleteDirs = dir.getObsolete(); @@ -346,10 +349,13 @@ public class Cleaner extends CompactorThread { } FileSystem fs = filesToDelete.get(0).getFileSystem(conf); + Database db = Hive.get().getDatabase(ci.dbname); for (Path dead : filesToDelete) { LOG.debug("Going to delete path " + dead.toString()); - replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true); + if (ReplChangeManager.isSourceOfReplication(db)) { + replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true); + } fs.delete(dead, true); } } http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q b/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q index cd9080c..9d712ca 100644 --- a/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q +++ b/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q @@ -16,7 +16,7 @@ drop database if exists test_repldump_adminpriv cascade; set user.name=ruser1; show role grant user ruser1; -create database test_repldump_adminpriv; +create database test_repldump_adminpriv with DBPROPERTIES ('repl.source.for' = '1,2,3'); create table test_repldump_adminpriv.dummy_tbl(a int) partitioned by (b string); show tables test_repldump_adminpriv; http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/ql/src/test/queries/clientnegative/repl_load_requires_admin.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/repl_load_requires_admin.q b/ql/src/test/queries/clientnegative/repl_load_requires_admin.q index 68a132d..0b1b12b 100644 --- a/ql/src/test/queries/clientnegative/repl_load_requires_admin.q +++ b/ql/src/test/queries/clientnegative/repl_load_requires_admin.q @@ -18,7 +18,7 @@ drop database if exists test_replload_adminpriv_tgt2 cascade; set user.name=ruser1; show role grant user ruser1; -create database test_replload_adminpriv_src; +create database test_replload_adminpriv_src with DBPROPERTIES ('repl.source.for' = '1,2,3'); create table test_replload_adminpriv_src.dummy_tbl(a int) partitioned by (b string); show tables test_replload_adminpriv_src; http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/ql/src/test/results/clientnegative/repl_dump_requires_admin.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientnegative/repl_dump_requires_admin.q.out b/ql/src/test/results/clientnegative/repl_dump_requires_admin.q.out index ac5103e..272b8b8 100644 --- a/ql/src/test/results/clientnegative/repl_dump_requires_admin.q.out +++ b/ql/src/test/results/clientnegative/repl_dump_requires_admin.q.out @@ -12,10 +12,10 @@ PREHOOK: type: SHOW_ROLE_GRANT POSTHOOK: query: show role grant user ruser1 POSTHOOK: type: SHOW_ROLE_GRANT public false -1 -PREHOOK: query: create database test_repldump_adminpriv +PREHOOK: query: create database test_repldump_adminpriv with DBPROPERTIES ('repl.source.for' = '1,2,3') PREHOOK: type: CREATEDATABASE PREHOOK: Output: database:test_repldump_adminpriv -POSTHOOK: query: create database test_repldump_adminpriv +POSTHOOK: query: create database test_repldump_adminpriv with DBPROPERTIES ('repl.source.for' = '1,2,3') POSTHOOK: type: CREATEDATABASE POSTHOOK: Output: database:test_repldump_adminpriv PREHOOK: query: create table test_repldump_adminpriv.dummy_tbl(a int) partitioned by (b string) http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out b/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out index 01b57a7..1499c39 100644 --- a/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out +++ b/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out @@ -20,10 +20,10 @@ PREHOOK: type: SHOW_ROLE_GRANT POSTHOOK: query: show role grant user ruser1 POSTHOOK: type: SHOW_ROLE_GRANT public false -1 -PREHOOK: query: create database test_replload_adminpriv_src +PREHOOK: query: create database test_replload_adminpriv_src with DBPROPERTIES ('repl.source.for' = '1,2,3') PREHOOK: type: CREATEDATABASE PREHOOK: Output: database:test_replload_adminpriv_src -POSTHOOK: query: create database test_replload_adminpriv_src +POSTHOOK: query: create database test_replload_adminpriv_src with DBPROPERTIES ('repl.source.for' = '1,2,3') POSTHOOK: type: CREATEDATABASE POSTHOOK: Output: database:test_replload_adminpriv_src PREHOOK: query: create table test_replload_adminpriv_src.dummy_tbl(a int) partitioned by (b string) http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 9ab9e85..be05838 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -242,7 +242,8 @@ public class HiveAlterHandler implements AlterHandler { " already exists : " + destPath); } // check that src exists and also checks permissions necessary, rename src to dest - if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath, true)) { + if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath, + ReplChangeManager.isSourceOfReplication(olddb))) { dataWasMoved = true; } } catch (IOException | MetaException e) { @@ -559,6 +560,7 @@ public class HiveAlterHandler implements AlterHandler { FileSystem srcFs; FileSystem destFs = null; boolean dataWasMoved = false; + Database db; try { msdb.openTransaction(); Table tbl = msdb.getTable(DEFAULT_CATALOG_NAME, dbname, name); @@ -593,9 +595,11 @@ public class HiveAlterHandler implements AlterHandler { // 3) rename the partition directory if it is not an external table if (!tbl.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) { try { + db = msdb.getDatabase(catName, dbname); + // if tbl location is available use it // else derive the tbl location from database location - destPath = wh.getPartitionPath(msdb.getDatabase(catName, dbname), tbl, new_part.getValues()); + destPath = wh.getPartitionPath(db, tbl, new_part.getValues()); destPath = constructRenamedPath(destPath, new Path(new_part.getSd().getLocation())); } catch (NoSuchObjectException e) { LOG.debug("Didn't find object in metastore ", e); @@ -633,7 +637,7 @@ public class HiveAlterHandler implements AlterHandler { } //rename the data directory - wh.renameDir(srcPath, destPath, true); + wh.renameDir(srcPath, destPath, ReplChangeManager.isSourceOfReplication(db)); LOG.info("Partition directory rename from " + srcPath + " to " + destPath + " done."); dataWasMoved = true; } @@ -645,7 +649,6 @@ public class HiveAlterHandler implements AlterHandler { LOG.error("Cannot rename partition directory from " + srcPath + " to " + destPath, me); throw me; } - new_part.getSd().setLocation(newPartLoc); } } else { http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 92d2e3f..c1d25db 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.metastore; import static org.apache.commons.lang.StringUtils.join; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_COMMENT; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; @@ -1019,7 +1020,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { // Create a default database inside the catalog Database db = new Database(DEFAULT_DATABASE_NAME, "Default database for catalog " + - catalog.getName(), catalog.getLocationUri(), Collections.emptyMap()); + catalog.getName(), catalog.getLocationUri(), Collections.emptyMap()); db.setCatalogName(catalog.getName()); create_database_core(ms, db); @@ -1035,7 +1036,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (!success) { ms.rollbackTransaction(); if (madeDir) { - wh.deleteDir(catPath, true); + wh.deleteDir(catPath, true, false, false); } } @@ -1165,7 +1166,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { success = ms.commitTransaction(); } finally { if (success) { - wh.deleteDir(wh.getDnsPath(new Path(cat.getLocationUri())), false); + wh.deleteDir(wh.getDnsPath(new Path(cat.getLocationUri())), false, false, false); } else { ms.rollbackTransaction(); } @@ -1227,7 +1228,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (!success) { ms.rollbackTransaction(); if (madeDir) { - wh.deleteDir(dbPath, true); + wh.deleteDir(dbPath, true, db); } } @@ -1385,6 +1386,10 @@ public class HiveMetaStore extends ThriftHiveMetastore { ms.openTransaction(); db = ms.getDatabase(catName, name); + if (!isInTest && ReplChangeManager.isSourceOfReplication(db)) { + throw new InvalidOperationException("can not drop a database which is a source of replication"); + } + firePreEvent(new PreDropDatabaseEvent(db, this)); String catPrependedName = MetaStoreUtils.prependCatalogToDbName(catName, name, conf); @@ -1516,14 +1521,14 @@ public class HiveMetaStore extends ThriftHiveMetastore { ms.rollbackTransaction(); } else if (deleteData) { // Delete the data in the partitions which have other locations - deletePartitionData(partitionPaths); + deletePartitionData(partitionPaths, false, db); // Delete the data in the tables which have other locations for (Path tablePath : tablePaths) { - deleteTableData(tablePath); + deleteTableData(tablePath, false, db); } // Delete the data in the database try { - wh.deleteDir(new Path(db.getLocationUri()), true); + wh.deleteDir(new Path(db.getLocationUri()), true, db); } catch (Exception e) { LOG.error("Failed to delete database directory: " + db.getLocationUri() + " " + e.getMessage()); @@ -1753,6 +1758,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { Map<String, String> transactionalListenerResponses = Collections.emptyMap(); Path tblPath = null; boolean success = false, madeDir = false; + Database db = null; try { if (!tbl.isSetCatName()) { tbl.setCatName(getDefaultCatalog(conf)); @@ -1761,11 +1767,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { ms.openTransaction(); - Database db = ms.getDatabase(tbl.getCatName(), tbl.getDbName()); - if (db == null) { - throw new NoSuchObjectException("The database " + - Warehouse.getCatalogQualifiedDbName(tbl.getCatName(), tbl.getDbName()) + " does not exist"); - } + db = ms.getDatabase(tbl.getCatName(), tbl.getDbName()); // get_table checks whether database exists, it should be moved here if (is_table_exists(ms, tbl.getCatName(), tbl.getDbName(), tbl.getTableName())) { @@ -1776,8 +1778,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) { if (tbl.getSd().getLocation() == null || tbl.getSd().getLocation().isEmpty()) { - tblPath = wh.getDefaultTablePath( - ms.getDatabase(tbl.getCatName(), tbl.getDbName()), tbl.getTableName()); + tblPath = wh.getDefaultTablePath(db, tbl.getTableName()); } else { if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) { LOG.warn("Location: " + tbl.getSd().getLocation() @@ -1900,7 +1901,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (!success) { ms.rollbackTransaction(); if (madeDir) { - wh.deleteDir(tblPath, true); + wh.deleteDir(tblPath, true, db); } } @@ -2344,8 +2345,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { Table tbl = null; boolean ifPurge = false; Map<String, String> transactionalListenerResponses = Collections.emptyMap(); + Database db = null; try { ms.openTransaction(); + db = ms.getDatabase(catName, dbname); + // drop any partitions tbl = get_table_core(catName, dbname, name); if (tbl == null) { @@ -2396,9 +2400,9 @@ public class HiveMetaStore extends ThriftHiveMetastore { } else if (deleteData && !isExternal) { // Data needs deletion. Check if trash may be skipped. // Delete the data in the partitions which have other locations - deletePartitionData(partPaths, ifPurge); + deletePartitionData(partPaths, ifPurge, db); // Delete the data in the table - deleteTableData(tblPath, ifPurge); + deleteTableData(tblPath, ifPurge, db); // ok even if the data is not deleted } @@ -2417,23 +2421,15 @@ public class HiveMetaStore extends ThriftHiveMetastore { * Deletes the data in a table's location, if it fails logs an error * * @param tablePath - */ - private void deleteTableData(Path tablePath) { - deleteTableData(tablePath, false); - } - - /** - * Deletes the data in a table's location, if it fails logs an error - * - * @param tablePath * @param ifPurge completely purge the table (skipping trash) while removing * data from warehouse + * @param db database the table belongs to */ - private void deleteTableData(Path tablePath, boolean ifPurge) { + private void deleteTableData(Path tablePath, boolean ifPurge, Database db) { if (tablePath != null) { try { - wh.deleteDir(tablePath, true, ifPurge); + wh.deleteDir(tablePath, true, ifPurge, db); } catch (Exception e) { LOG.error("Failed to delete table directory: " + tablePath + " " + e.getMessage()); @@ -2442,28 +2438,19 @@ public class HiveMetaStore extends ThriftHiveMetastore { } /** - * Give a list of partitions' locations, tries to delete each one - * and for each that fails logs an error. - * - * @param partPaths - */ - private void deletePartitionData(List<Path> partPaths) { - deletePartitionData(partPaths, false); - } - - /** * Give a list of partitions' locations, tries to delete each one * and for each that fails logs an error. * * @param partPaths * @param ifPurge completely purge the partition (skipping trash) while * removing data from warehouse + * @param db database the partition belongs to */ - private void deletePartitionData(List<Path> partPaths, boolean ifPurge) { + private void deletePartitionData(List<Path> partPaths, boolean ifPurge, Database db) { if (partPaths != null && !partPaths.isEmpty()) { for (Path partPath : partPaths) { try { - wh.deleteDir(partPath, true, ifPurge); + wh.deleteDir(partPath, true, ifPurge, db); } catch (Exception e) { LOG.error("Failed to delete partition directory: " + partPath + " " + e.getMessage()); @@ -2695,6 +2682,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { String[] parsedDbName = parseDbName(dbName, conf); Table tbl = get_table_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName); boolean isAutopurge = (tbl.isSetParameters() && "true".equalsIgnoreCase(tbl.getParameters().get("auto.purge"))); + Database db = get_database_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME]); // This is not transactional for (Path location : getLocationsForTruncate(getMS(), parsedDbName[CAT_NAME], @@ -2705,7 +2693,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { HdfsUtils.HadoopFileStatus status = new HdfsUtils.HadoopFileStatus(getConf(), fs, location); FileStatus targetStatus = fs.getFileStatus(location); String targetGroup = targetStatus == null ? null : targetStatus.getGroup(); - wh.deleteDir(location, true, isAutopurge); + wh.deleteDir(location, true, isAutopurge, db); fs.mkdirs(location); HdfsUtils.setFullFileStatus(getConf(), status, targetGroup, fs, location, false); } else { @@ -2714,7 +2702,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { continue; } for (final FileStatus status : statuses) { - wh.deleteDir(status.getPath(), true, isAutopurge); + wh.deleteDir(status.getPath(), true, isAutopurge, db); } } } @@ -2973,13 +2961,14 @@ public class HiveMetaStore extends ThriftHiveMetastore { private Partition append_partition_common(RawStore ms, String catName, String dbName, String tableName, List<String> part_vals, EnvironmentContext envContext) - throws InvalidObjectException, AlreadyExistsException, MetaException { + throws InvalidObjectException, AlreadyExistsException, MetaException, NoSuchObjectException { Partition part = new Partition(); boolean success = false, madeDir = false; Path partLocation = null; Table tbl = null; Map<String, String> transactionalListenerResponses = Collections.emptyMap(); + Database db = null; try { ms.openTransaction(); part.setCatName(catName); @@ -2999,6 +2988,8 @@ public class HiveMetaStore extends ThriftHiveMetastore { "Cannot append a partition to a view"); } + db = get_database_core(catName, dbName); + firePreEvent(new PreAddPartitionEvent(tbl, part, this)); part.setSd(tbl.getSd().deepCopy()); @@ -3051,7 +3042,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (!success) { ms.rollbackTransaction(); if (madeDir) { - wh.deleteDir(partLocation, true); + wh.deleteDir(partLocation, true, db); } } @@ -3177,6 +3168,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { final List<Partition> existingParts = new ArrayList<>(); Table tbl = null; Map<String, String> transactionalListenerResponses = Collections.emptyMap(); + Database db = null; try { ms.openTransaction(); @@ -3187,6 +3179,8 @@ public class HiveMetaStore extends ThriftHiveMetastore { " does not exist"); } + db = ms.getDatabase(catName, dbName); + if (!parts.isEmpty()) { firePreEvent(new PreAddPartitionEvent(tbl, parts, this)); } @@ -3310,7 +3304,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { for (Map.Entry<PartValEqWrapperLite, Boolean> e : addedPartitions.entrySet()) { if (e.getValue()) { // we just created this directory - it's not a case of pre-creation, so we nuke. - wh.deleteDir(new Path(e.getKey().location), true); + wh.deleteDir(new Path(e.getKey().location), true, db); } } @@ -3446,6 +3440,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { .getPartitionIterator(); Table tbl = null; Map<String, String> transactionalListenerResponses = Collections.emptyMap(); + Database db = null; try { ms.openTransaction(); tbl = ms.getTable(catName, dbName, tblName); @@ -3453,7 +3448,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { throw new InvalidObjectException("Unable to add partitions because " + "database or table " + dbName + "." + tblName + " does not exist"); } - + db = ms.getDatabase(catName, dbName); firePreEvent(new PreAddPartitionEvent(tbl, partitionSpecProxy, this)); Set<PartValEqWrapperLite> partsToAdd = new HashSet<>(partitionSpecProxy.size()); List<Partition> partitionsToAdd = new ArrayList<>(partitionSpecProxy.size()); @@ -3569,7 +3564,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { for (Map.Entry<PartValEqWrapperLite, Boolean> e : addedPartitions.entrySet()) { if (e.getValue()) { // we just created this directory - it's not a case of pre-creation, so we nuke. - wh.deleteDir(new Path(e.getKey().location), true); + wh.deleteDir(new Path(e.getKey().location), true, db); } } } @@ -3721,7 +3716,8 @@ public class HiveMetaStore extends ThriftHiveMetastore { success = ms.addPartition(part); } finally { if (!success && madeDir) { - wh.deleteDir(new Path(part.getSd().getLocation()), true); + wh.deleteDir(new Path(part.getSd().getLocation()), true, + ms.getDatabase(tbl.getCatName(), tbl.getDbName())); } } @@ -3799,6 +3795,19 @@ public class HiveMetaStore extends ThriftHiveMetastore { return new Partition(); } + private boolean isRenameAllowed(String catalogName, String srcDBName, String destDBName) + throws MetaException, NoSuchObjectException { + RawStore ms = getMS(); + if (!srcDBName.equalsIgnoreCase(destDBName)) { + Database destDB = ms.getDatabase(catalogName, destDBName); + Database srcDB = ms.getDatabase(catalogName, srcDBName); + if (ReplChangeManager.isSourceOfReplication(srcDB) || ReplChangeManager.isSourceOfReplication(destDB)) { + return false; + } + } + return true; + } + @Override public List<Partition> exchange_partitions(Map<String, String> partitionSpecs, String sourceDbName, String sourceTableName, String destDbName, @@ -3834,6 +3843,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { getCatalogQualifiedTableName(parsedSourceDbName[CAT_NAME], parsedSourceDbName[DB_NAME], sourceTableName) + " not found"); } + List<String> partVals = MetaStoreUtils.getPvals(sourceTable.getPartitionKeys(), partitionSpecs); List<String> partValsPresent = new ArrayList<> (); @@ -3886,6 +3896,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { } } + if (!isRenameAllowed(parsedDestDbName[CAT_NAME], parsedSourceDbName[DB_NAME], parsedDestDbName[DB_NAME])) { + throw new MetaException("Exchange partition not allowed for " + + getCatalogQualifiedTableName(parsedSourceDbName[CAT_NAME], + parsedSourceDbName[DB_NAME], sourceTableName) + " Dest db : " + destDbName); + } try { for (Partition partition: partitionsToExchange) { Partition destPartition = new Partition(partition); @@ -3981,6 +3996,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { Path archiveParentDir = null; boolean mustPurge = false; boolean isExternalTbl = false; + boolean isSourceOfReplication = false; Map<String, String> transactionalListenerResponses = Collections.emptyMap(); if (db_name == null) { @@ -4028,6 +4044,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { new DropPartitionEvent(tbl, part, true, deleteData, this), envContext); } + isSourceOfReplication = ReplChangeManager.isSourceOfReplication(ms.getDatabase(catName, db_name)); success = ms.commitTransaction(); } } finally { @@ -4043,13 +4060,14 @@ public class HiveMetaStore extends ThriftHiveMetastore { } // Archived partitions have har:/to_har_file as their location. // The original directory was saved in params + if (isArchived) { assert (archiveParentDir != null); - wh.deleteDir(archiveParentDir, true, mustPurge); + wh.deleteDir(archiveParentDir, true, mustPurge, isSourceOfReplication); } else { assert (partPath != null); - wh.deleteDir(partPath, true, mustPurge); - deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge); + wh.deleteDir(partPath, true, mustPurge, isSourceOfReplication); + deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, isSourceOfReplication); } // ok even if the data is not deleted } @@ -4077,12 +4095,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { || (tbl.isSetParameters() && "true".equalsIgnoreCase(tbl.getParameters().get("auto.purge"))); } - private void deleteParentRecursive(Path parent, int depth, boolean mustPurge) throws IOException, MetaException { + private void deleteParentRecursive(Path parent, int depth, boolean mustPurge, boolean needRecycle) + throws IOException, MetaException { if (depth > 0 && parent != null && wh.isWritable(parent)) { if (wh.isDir(parent) && wh.isEmpty(parent)) { - wh.deleteDir(parent, true, mustPurge); + wh.deleteDir(parent, true, mustPurge, needRecycle); } - deleteParentRecursive(parent.getParent(), depth - 1, mustPurge); + deleteParentRecursive(parent.getParent(), depth - 1, mustPurge, needRecycle); } } @@ -4124,6 +4143,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { List<Partition> parts = null; boolean mustPurge = false; List<Map<String, String>> transactionalListenerResponses = Lists.newArrayList(); + boolean isSourceOfReplication = ReplChangeManager.isSourceOfReplication(ms.getDatabase(catName, dbName)); try { // We need Partition-s for firing events and for result; DN needs MPartition-s to drop. @@ -4231,12 +4251,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { // Archived partitions have har:/to_har_file as their location. // The original directory was saved in params for (Path path : archToDelete) { - wh.deleteDir(path, true, mustPurge); + wh.deleteDir(path, true, mustPurge, isSourceOfReplication); } for (PathAndPartValSize p : dirsToDelete) { - wh.deleteDir(p.path, true, mustPurge); + wh.deleteDir(p.path, true, mustPurge, isSourceOfReplication); try { - deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge); + deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge, isSourceOfReplication); } catch (IOException ex) { LOG.warn("Error from deleteParentRecursive", ex); throw new MetaException("Failed to delete parent: " + ex.getMessage()); @@ -4860,6 +4880,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { throws InvalidOperationException, MetaException { startFunction("alter_table", ": " + getCatalogQualifiedTableName(catName, dbname, name) + " newtbl=" + newTable.getTableName()); + // Update the time if it hasn't been specified. if (newTable.getParameters() == null || newTable.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) { @@ -4884,6 +4905,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { Exception ex = null; try { Table oldt = get_table_core(catName, dbname, name); + if (!isRenameAllowed(catName, dbname, newTable.getDbName())) { + throw new MetaException("Alter table not allowed for table " + + getCatalogQualifiedTableName(catName, dbname, name) + + " new table = " + getCatalogQualifiedTableName(newTable)); + } firePreEvent(new PreAlterTableEvent(oldt, newTable, this)); alterHandler.alterTable(getMS(), wh, catName, dbname, name, newTable, envContext, this); @@ -6850,12 +6876,15 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (func == null) { throw new NoSuchObjectException("Function " + funcName + " does not exist"); } + Boolean isSourceOfReplication = + ReplChangeManager.isSourceOfReplication(get_database_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME])); + // if copy of jar to change management fails we fail the metastore transaction, since the // user might delete the jars on HDFS externally after dropping the function, hence having // a copy is required to allow incremental replication to work correctly. if (func.getResourceUris() != null && !func.getResourceUris().isEmpty()) { for (ResourceUri uri : func.getResourceUris()) { - if (uri.getUri().toLowerCase().startsWith("hdfs:")) { + if (uri.getUri().toLowerCase().startsWith("hdfs:") && isSourceOfReplication) { wh.addToChangeManagement(new Path(uri.getUri())); } } http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index 79ba7ff..f7018c2 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.metastore; import java.io.IOException; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -33,6 +34,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; @@ -56,6 +58,7 @@ public class ReplChangeManager { private static final String ORIG_LOC_TAG = "user.original-loc"; static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash"; private static final String URI_FRAGMENT_SEPARATOR = "#"; + public static final String SOURCE_OF_REPLICATION = "repl.source.for"; public enum RecycleType { MOVE, @@ -467,4 +470,24 @@ public class ReplChangeManager { 0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS); } } + + public static boolean isSourceOfReplication(Database db) { + // Can not judge, so assuming replication is not enabled. + assert (db != null); + String replPolicyIds = getReplPolicyIdString(db); + return !StringUtils.isEmpty(replPolicyIds); + } + + public static String getReplPolicyIdString(Database db) { + if (db != null) { + Map<String, String> m = db.getParameters(); + if ((m != null) && (m.containsKey(SOURCE_OF_REPLICATION))) { + String replPolicyId = m.get(SOURCE_OF_REPLICATION); + LOG.debug("repl policy for database {} is {}", db.getName(), replPolicyId); + return replPolicyId; + } + LOG.debug("Repl policy is not set for database ", db.getName()); + } + return null; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java index 88cbfcd..e31935e 100755 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -301,18 +301,16 @@ public class Warehouse { } } - public boolean deleteDir(Path f, boolean recursive) throws MetaException { - return deleteDir(f, recursive, false); + public boolean deleteDir(Path f, boolean recursive, Database db) throws MetaException { + return deleteDir(f, recursive, false, db); } - public boolean deleteDir(Path f, boolean recursive, boolean ifPurge) throws MetaException { - return deleteDir(f, recursive, ifPurge, true); + public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, Database db) throws MetaException { + return deleteDir(f, recursive, ifPurge, ReplChangeManager.isSourceOfReplication(db)); } public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, boolean needCmRecycle) throws MetaException { - // no need to create the CM recycle file for temporary tables if (needCmRecycle) { - try { cm.recycle(f, RecycleType.MOVE, ifPurge); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/hive/blob/9d23f718/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MDatabase.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MDatabase.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MDatabase.java index fa30330..815b39c 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MDatabase.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MDatabase.java @@ -21,7 +21,12 @@ */ package org.apache.hadoop.hive.metastore.model; +import org.apache.hadoop.hive.metastore.ReplChangeManager; + +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; /** * Storage Class representing the Hive MDatabase in a rdbms @@ -109,7 +114,21 @@ public class MDatabase { * @param parameters the parameters mapping. */ public void setParameters(Map<String, String> parameters) { - this.parameters = parameters; + if (parameters == null) { + this.parameters = null; + return; + } + this.parameters = new HashMap<>(); + Set<String> keys = new HashSet<>(parameters.keySet()); + for(String key : keys) { + // Normalize the case for source of replication parameter + if (ReplChangeManager.SOURCE_OF_REPLICATION.equalsIgnoreCase(key)) { + // TODO : Some extra validation can also be added as this is a user provided parameter. + this.parameters.put(ReplChangeManager.SOURCE_OF_REPLICATION, parameters.get(key)); + } else { + this.parameters.put(key, parameters.get(key)); + } + } } public String getOwnerName() {