HIVE-19739: Bootstrap REPL LOAD to use checkpoints to validate and skip the loaded data/metadata (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Anishek Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b8596289 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b8596289 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b8596289 Branch: refs/heads/branch-3 Commit: b859628967a48753102f3c305e3f2f754b70f7bb Parents: 600ec62 Author: Sankar Hariappan <sank...@apache.org> Authored: Sat Jun 16 23:38:33 2018 -0700 Committer: Sankar Hariappan <sank...@apache.org> Committed: Sat Jun 16 23:38:33 2018 -0700 ---------------------------------------------------------------------- .../TestReplicationScenariosAcidTables.java | 93 ++++++ ...TestReplicationScenariosAcrossInstances.java | 332 +++++++++++++++++-- .../hadoop/hive/ql/parse/WarehouseInstance.java | 70 +++- .../hadoop/hive/ql/exec/repl/ReplUtils.java | 65 ++++ .../ql/exec/repl/bootstrap/ReplLoadTask.java | 20 +- .../events/filesystem/FSTableEvent.java | 6 +- .../repl/bootstrap/load/LoadConstraint.java | 89 ++++- .../exec/repl/bootstrap/load/LoadDatabase.java | 112 +++++-- .../exec/repl/bootstrap/load/LoadFunction.java | 26 ++ .../bootstrap/load/table/LoadPartitions.java | 125 ++++--- .../repl/bootstrap/load/table/LoadTable.java | 95 +++--- .../ql/parse/ReplicationSemanticAnalyzer.java | 6 +- .../repl/load/message/DropPartitionHandler.java | 43 +-- .../hadoop/hive/metastore/ObjectStore.java | 17 +- .../InjectableBehaviourObjectStore.java | 119 ++++++- 15 files changed, 968 insertions(+), 250 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b8596289/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 9a2d296..a1498ca 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -28,6 +28,9 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import org.junit.rules.TestName; import org.junit.rules.TestRule; @@ -45,6 +48,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import javax.annotation.Nullable; /** * TestReplicationScenariosAcidTables - test replication for ACID tables @@ -345,4 +349,93 @@ public class TestReplicationScenariosAcidTables { .run("REPL STATUS " + replicatedDbName) .verifyResult(bootStrapDump.lastReplicationId); } + + @Test + public void testAcidBootstrapReplLoadRetryAfterFailure() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(1)") + .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " + + "\"transactional_properties\"=\"insert_only\")") + .run("insert into t2 partition(name='bob') values(11)") + .run("insert into t2 partition(name='carl') values(10)") + .dump(primaryDbName, null); + + WarehouseInstance.Tuple tuple2 = primary + .run("use " + primaryDbName) + .dump(primaryDbName, null); + + // Inject a behavior where REPL LOAD failed when try to load table "t2", it fails. + BehaviourInjection<CallerArguments, Boolean> callerVerifier + = new BehaviourInjection<CallerArguments, Boolean>() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + injectionPathCalled = true; + if (!args.dbName.equalsIgnoreCase(replicatedDbName)) { + LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)); + return false; + } + if (args.tblName != null) { + LOG.warn("Verifier - Table: " + String.valueOf(args.tblName)); + return args.tblName.equals("t1"); + } + return true; + } + }; + InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); + + List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'"); + replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs); + callerVerifier.assertInjectionsPerformed(true, false); + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult("null") + .run("show tables") + .verifyResults(new String[] { "t1" }) + .run("select id from t1") + .verifyResults(Arrays.asList("1")); + + // Retry with different dump should fail. + replica.loadFailure(replicatedDbName, tuple2.dumpLocation); + + // Verify if no create table on t1. Only table t2 should be created in retry. + callerVerifier = new BehaviourInjection<CallerArguments, Boolean>() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + injectionPathCalled = true; + if (!args.dbName.equalsIgnoreCase(replicatedDbName)) { + LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)); + return false; + } + if (args.tblName != null) { + LOG.warn("Verifier - Table: " + String.valueOf(args.tblName)); + return args.tblName.equals("t2"); + } + return true; + } + }; + InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); + + // Retry with same dump with which it was already loaded should resume the bootstrap load. + // This time, it completes by adding just constraints for table t4. + replica.load(replicatedDbName, tuple.dumpLocation); + callerVerifier.assertInjectionsPerformed(true, false); + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("1")) + .run("select name from t2 order by name") + .verifyResults(Arrays.asList("bob", "carl")); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/b8596289/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 35437b1..4b3a976 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -29,6 +29,9 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.ql.util.DependencyResolver; import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; @@ -53,11 +56,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import javax.annotation.Nullable; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; @@ -436,8 +441,10 @@ public class TestReplicationScenariosAcrossInstances { End of additional steps */ + // Reset ckpt and last repl ID keys to empty set for allowing bootstrap load replica.run("show databases") .verifyFailure(new String[] { primaryDbName, dbOne, dbTwo }) + .run("alter database default set dbproperties ('hive.repl.ckpt.key'='', 'repl.last.id'='')") .load("", tuple.dumpLocation) .run("show databases") .verifyResults(new String[] { "default", primaryDbName, dbOne, dbTwo }) @@ -450,8 +457,9 @@ public class TestReplicationScenariosAcrossInstances { .run("use " + dbTwo) .run("show tables") .verifyResults(new String[] { "t1" }); + /* - Start of cleanup + Start of cleanup */ replica.run("drop database " + primaryDbName + " cascade"); @@ -505,8 +513,10 @@ public class TestReplicationScenariosAcrossInstances { End of additional steps */ + // Reset ckpt and last repl ID keys to empty set for allowing bootstrap load replica.run("show databases") .verifyFailure(new String[] { primaryDbName, dbOne, dbTwo }) + .run("alter database default set dbproperties ('hive.repl.ckpt.key'='', 'repl.last.id'='')") .load("", bootstrapTuple.dumpLocation) .run("show databases") .verifyResults(new String[] { "default", primaryDbName, dbOne }) @@ -772,6 +782,23 @@ public class TestReplicationScenariosAcrossInstances { assertFalse(fs.exists(cSerdesTableDumpLocation)); } + private void verifyIfCkptSet(WarehouseInstance wh, String dbName, String dumpDir) throws Exception { + Database db = wh.getDatabase(replicatedDbName); + verifyIfCkptSet(db.getParameters(), dumpDir); + + List<String> tblNames = wh.getAllTables(dbName); + for (String tblName : tblNames) { + Table tbl = wh.getTable(dbName, tblName); + verifyIfCkptSet(tbl.getParameters(), dumpDir); + if (tbl.getPartitionKeysSize() != 0) { + List<Partition> partitions = wh.getAllPartitions(dbName, tblName); + for (Partition ptn : partitions) { + verifyIfCkptSet(ptn.getParameters(), dumpDir); + } + } + } + } + private void verifyIfCkptSet(Map<String, String> props, String dumpDir) { assertTrue(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY)); assertTrue(props.get(ReplUtils.REPL_CHECKPOINT_KEY).equals(dumpDir)); @@ -786,41 +813,6 @@ public class TestReplicationScenariosAcrossInstances { } @Test - public void testIfCkptSetForObjectsByBootstrapReplLoad() throws Throwable { - WarehouseInstance.Tuple tuple = primary - .run("use " + primaryDbName) - .run("create table t1 (id int)") - .run("insert into table t1 values (10)") - .run("create table t2 (place string) partitioned by (country string)") - .run("insert into table t2 partition(country='india') values ('bangalore')") - .run("insert into table t2 partition(country='uk') values ('london')") - .run("insert into table t2 partition(country='us') values ('sfo')") - .dump(primaryDbName, null); - - replica.load(replicatedDbName, tuple.dumpLocation) - .run("use " + replicatedDbName) - .run("repl status " + replicatedDbName) - .verifyResult(tuple.lastReplicationId) - .run("show tables") - .verifyResults(new String[] { "t1", "t2" }) - .run("select country from t2") - .verifyResults(Arrays.asList("india", "uk", "us")); - - Database db = replica.getDatabase(replicatedDbName); - verifyIfCkptSet(db.getParameters(), tuple.dumpLocation); - Table t1 = replica.getTable(replicatedDbName, "t1"); - verifyIfCkptSet(t1.getParameters(), tuple.dumpLocation); - Table t2 = replica.getTable(replicatedDbName, "t2"); - verifyIfCkptSet(t2.getParameters(), tuple.dumpLocation); - Partition india = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("india")); - verifyIfCkptSet(india.getParameters(), tuple.dumpLocation); - Partition us = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("us")); - verifyIfCkptSet(us.getParameters(), tuple.dumpLocation); - Partition uk = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("uk")); - verifyIfCkptSet(uk.getParameters(), tuple.dumpLocation); - } - - @Test public void testIfCkptAndSourceOfReplPropsIgnoredByReplDump() throws Throwable { WarehouseInstance.Tuple tuplePrimary = primary .run("use " + primaryDbName) @@ -944,4 +936,272 @@ public class TestReplicationScenariosAcrossInstances { replica.run("drop database if exists " + importDbFromReplica + " cascade"); } + + @Test + public void testIfBootstrapReplLoadFailWhenRetryAfterBootstrapComplete() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into table t1 values (10)") + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='uk') values ('london')") + .run("insert into table t2 partition(country='us') values ('sfo')") + .dump(primaryDbName, null); + + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("10")) + .run("select country from t2 order by country") + .verifyResults(Arrays.asList("india", "uk", "us")); + verifyIfCkptSet(replica, replicatedDbName, tuple.dumpLocation); + + WarehouseInstance.Tuple tuple_2 = primary + .run("use " + primaryDbName) + .dump(primaryDbName, null); + + // Retry with different dump should fail. + replica.loadFailure(replicatedDbName, tuple_2.dumpLocation); + + // Retry with same dump with which it was already loaded also fails. + replica.loadFailure(replicatedDbName, tuple.dumpLocation); + + // Retry from same dump when the database is empty is also not allowed. + replica.run("drop table t1") + .run("drop table t2") + .loadFailure(replicatedDbName, tuple.dumpLocation); + } + + @Test + public void testBootstrapReplLoadRetryAfterFailureForTablesAndConstraints() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1(a string, b string, primary key (a, b) disable novalidate rely)") + .run("create table t2(a string, b string, foreign key (a, b) references t1(a, b) disable novalidate)") + .run("create table t3(a string, b string not null disable, unique (a) disable)") + .dump(primaryDbName, null); + + WarehouseInstance.Tuple tuple2 = primary + .run("use " + primaryDbName) + .dump(primaryDbName, null); + + // Need to drop the primary DB as metastore is shared by both primary/replica. So, constraints + // conflict when loaded. Some issue with framework which needs to be relook into later. + primary.run("drop database if exists " + primaryDbName + " cascade"); + + // Allow create table only on t1. Create should fail for rest of the tables and hence constraints + // also not loaded. + BehaviourInjection<CallerArguments, Boolean> callerVerifier + = new BehaviourInjection<CallerArguments, Boolean>() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + injectionPathCalled = true; + if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.constraintTblName != null)) { + LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + + " Constraint Table: " + String.valueOf(args.constraintTblName)); + return false; + } + if (args.tblName != null) { + LOG.warn("Verifier - Table: " + String.valueOf(args.tblName)); + return args.tblName.equals("t1"); + } + return true; + } + }; + InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); + + // Trigger bootstrap dump which just creates table t1 and other tables (t2, t3) and constraints not loaded. + List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'"); + replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs); + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + callerVerifier.assertInjectionsPerformed(true, false); + + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult("null") + .run("show tables") + .verifyResults(new String[] { "t1" }); + assertEquals(0, replica.getPrimaryKeyList(replicatedDbName, "t1").size()); + assertEquals(0, replica.getUniqueConstraintList(replicatedDbName, "t3").size()); + assertEquals(0, replica.getNotNullConstraintList(replicatedDbName, "t3").size()); + assertEquals(0, replica.getForeignKeyList(replicatedDbName, "t2").size()); + + // Retry with different dump should fail. + replica.loadFailure(replicatedDbName, tuple2.dumpLocation); + + // Verify if create table is not called on table t1 but called for t2 and t3. + // Also, allow constraint creation only on t1 and t3. Foreign key creation on t2 fails. + callerVerifier = new BehaviourInjection<CallerArguments, Boolean>() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + injectionPathCalled = true; + if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.funcName != null)) { + LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + " Func: " + String.valueOf(args.funcName)); + return false; + } + if (args.tblName != null) { + LOG.warn("Verifier - Table: " + String.valueOf(args.tblName)); + return (args.tblName.equals("t2") || args.tblName.equals("t3")); + } + if (args.constraintTblName != null) { + LOG.warn("Verifier - Constraint Table: " + String.valueOf(args.constraintTblName)); + return (args.constraintTblName.equals("t1") || args.constraintTblName.equals("t3")); + } + return true; + } + }; + InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); + + // Retry with same dump with which it was already loaded should resume the bootstrap load. + // This time, it fails when try to load the foreign key constraints. All other constraints are loaded. + replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs); + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + callerVerifier.assertInjectionsPerformed(true, false); + + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult("null") + .run("show tables") + .verifyResults(new String[] { "t1", "t2", "t3" }); + assertEquals(2, replica.getPrimaryKeyList(replicatedDbName, "t1").size()); + assertEquals(1, replica.getUniqueConstraintList(replicatedDbName, "t3").size()); + assertEquals(1, replica.getNotNullConstraintList(replicatedDbName, "t3").size()); + assertEquals(0, replica.getForeignKeyList(replicatedDbName, "t2").size()); + + // Verify if no create table/function calls. Only add foreign key constraints on table t2. + callerVerifier = new BehaviourInjection<CallerArguments, Boolean>() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + injectionPathCalled = true; + if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.tblName != null)) { + LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + + " Table: " + String.valueOf(args.tblName)); + return false; + } + if (args.constraintTblName != null) { + LOG.warn("Verifier - Constraint Table: " + String.valueOf(args.constraintTblName)); + return args.constraintTblName.equals("t2"); + } + return true; + } + }; + InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); + + // Retry with same dump with which it was already loaded should resume the bootstrap load. + // This time, it completes by adding just foreign key constraints for table t2. + replica.load(replicatedDbName, tuple.dumpLocation); + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + callerVerifier.assertInjectionsPerformed(true, false); + + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2", "t3" }); + assertEquals(2, replica.getPrimaryKeyList(replicatedDbName, "t1").size()); + assertEquals(1, replica.getUniqueConstraintList(replicatedDbName, "t3").size()); + assertEquals(1, replica.getNotNullConstraintList(replicatedDbName, "t3").size()); + assertEquals(2, replica.getForeignKeyList(replicatedDbName, "t2").size()); + } + + @Test + public void testBootstrapReplLoadRetryAfterFailureForPartitions() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into table t1 values (10)") + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='uk') values ('london')") + .run("insert into table t2 partition(country='us') values ('sfo')") + .run("CREATE FUNCTION " + primaryDbName + + ".testFunctionOne as 'hivemall.tools.string.StopwordUDF' " + + "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'") + .dump(primaryDbName, null); + + WarehouseInstance.Tuple tuple2 = primary + .run("use " + primaryDbName) + .dump(primaryDbName, null); + + // Inject a behavior where REPL LOAD failed when try to load table "t2" and partition "uk". + // So, table "t1" and "t2" will exist and partition "india" will exist, rest failed as operation failed. + BehaviourInjection<Partition, Partition> getPartitionStub + = new BehaviourInjection<Partition, Partition>() { + @Nullable + @Override + public Partition apply(@Nullable Partition ptn) { + if (ptn.getValues().get(0).equals("india")) { + injectionPathCalled = true; + LOG.warn("####getPartition Stub called"); + return null; + } + return ptn; + } + }; + InjectableBehaviourObjectStore.setGetPartitionBehaviour(getPartitionStub); + + List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'"); + replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs); + InjectableBehaviourObjectStore.resetGetPartitionBehaviour(); // reset the behaviour + getPartitionStub.assertInjectionsPerformed(true, false); + + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult("null") + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("10")) + .run("select country from t2 order by country") + .verifyResults(Arrays.asList("india")) + .run("show functions like '" + replicatedDbName + "*'") + .verifyResult(replicatedDbName + ".testFunctionOne"); + + // Retry with different dump should fail. + replica.loadFailure(replicatedDbName, tuple2.dumpLocation); + + // Verify if no create table/function calls. Only add partitions. + BehaviourInjection<CallerArguments, Boolean> callerVerifier + = new BehaviourInjection<CallerArguments, Boolean>() { + @Nullable + @Override + public Boolean apply(@Nullable CallerArguments args) { + if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.tblName != null) || (args.funcName != null)) { + injectionPathCalled = true; + LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + + " Table: " + String.valueOf(args.tblName) + + " Func: " + String.valueOf(args.funcName)); + return false; + } + return true; + } + }; + InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier); + + // Retry with same dump with which it was already loaded should resume the bootstrap load. + // This time, it completes by adding remaining partitions. + replica.load(replicatedDbName, tuple.dumpLocation); + InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour + callerVerifier.assertInjectionsPerformed(false, false); + + replica.run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2" }) + .run("select id from t1") + .verifyResults(Arrays.asList("10")) + .run("select country from t2 order by country") + .verifyResults(Arrays.asList("india", "uk", "us")) + .run("show functions like '" + replicatedDbName + "*'") + .verifyResult(replicatedDbName + ".testFunctionOne"); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/b8596289/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index b38965c..62f67b4 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -30,9 +30,19 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.IDriver; @@ -130,6 +140,8 @@ public class WarehouseInstance implements Closeable { if (!hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER).equals("org.apache.hadoop.hive.ql.lockmgr.DbTxnManager")) { hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); } + hiveConf.set(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL.varname, + "org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore"); System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); @@ -257,12 +269,19 @@ public class WarehouseInstance implements Closeable { } WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation) throws Throwable { - runFailure("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); - printOutput(); runFailure("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); return this; } + WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation, List<String> withClauseOptions) + throws Throwable { + String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"; + if (!withClauseOptions.isEmpty()) { + replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")"; + } + return runFailure(replLoadCmd); + } + WarehouseInstance verifyResult(String data) throws IOException { verifyResults(data == null ? new String[] {} : new String[] { data }); return this; @@ -331,15 +350,56 @@ public class WarehouseInstance implements Closeable { } public Database getDatabase(String dbName) throws Exception { - return client.getDatabase(dbName); + try { + return client.getDatabase(dbName); + } catch (NoSuchObjectException e) { + return null; + } + } + + public List<String> getAllTables(String dbName) throws Exception { + return client.getAllTables(dbName); } public Table getTable(String dbName, String tableName) throws Exception { - return client.getTable(dbName, tableName); + try { + return client.getTable(dbName, tableName); + } catch (NoSuchObjectException e) { + return null; + } + } + + public List<Partition> getAllPartitions(String dbName, String tableName) throws Exception { + try { + return client.listPartitions(dbName, tableName, Short.MAX_VALUE); + } catch (NoSuchObjectException e) { + return null; + } } public Partition getPartition(String dbName, String tableName, List<String> partValues) throws Exception { - return client.getPartition(dbName, tableName, partValues); + try { + return client.getPartition(dbName, tableName, partValues); + } catch (NoSuchObjectException e) { + return null; + } + } + + public List<SQLPrimaryKey> getPrimaryKeyList(String dbName, String tblName) throws Exception { + return client.getPrimaryKeys(new PrimaryKeysRequest(dbName, tblName)); + } + + public List<SQLForeignKey> getForeignKeyList(String dbName, String tblName) throws Exception { + return client.getForeignKeys(new ForeignKeysRequest(null, null, dbName, tblName)); + } + + public List<SQLUniqueConstraint> getUniqueConstraintList(String dbName, String tblName) throws Exception { + return client.getUniqueConstraints(new UniqueConstraintsRequest(Warehouse.DEFAULT_CATALOG_NAME, dbName, tblName)); + } + + public List<SQLNotNullConstraint> getNotNullConstraintList(String dbName, String tblName) throws Exception { + return client.getNotNullConstraints( + new NotNullConstraintsRequest(Warehouse.DEFAULT_CATALOG_NAME, dbName, tblName)); } ReplicationV1CompatRule getReplivationV1CompatRule(List<String> testsToSkip) { http://git-wip-us.apache.org/repos/asf/hive/blob/b8596289/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java index cbec3ad..18a8304 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java @@ -18,23 +18,74 @@ package org.apache.hadoop.hive.ql.exec.repl; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.stats.StatsUtils; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; public class ReplUtils { public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key"; + /** + * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. + */ + public enum ReplLoadOpType { + LOAD_NEW, LOAD_SKIP, LOAD_REPLACE + } + + public static Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs( + Table table, List<Map<String, String>> partitions) throws SemanticException { + Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = new HashMap<>(); + int partPrefixLength = 0; + if (partitions.size() > 0) { + partPrefixLength = partitions.get(0).size(); + // pick the length of the first ptn, we expect all ptns listed to have the same number of + // key-vals. + } + List<ExprNodeGenericFuncDesc> partitionDesc = new ArrayList<>(); + for (Map<String, String> ptn : partitions) { + // convert each key-value-map to appropriate expression. + ExprNodeGenericFuncDesc expr = null; + for (Map.Entry<String, String> kvp : ptn.entrySet()) { + String key = kvp.getKey(); + Object val = kvp.getValue(); + String type = table.getPartColByName(key).getType(); + PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type); + ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true); + ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate( + "=", column, new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, val)); + expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op); + } + if (expr != null) { + partitionDesc.add(expr); + } + } + if (partitionDesc.size() > 0) { + partSpecs.put(partPrefixLength, partitionDesc); + } + return partSpecs; + } + public static Task<?> getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf) throws SemanticException { ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableDesc.tableType()); @@ -55,4 +106,18 @@ public class ReplUtils { } return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterTblDesc), conf); } + + public static boolean replCkptStatus(String dbName, Map<String, String> props, String dumpRoot) + throws InvalidOperationException { + // If ckpt property not set or empty means, bootstrap is not run on this object. + if ((props != null) && props.containsKey(REPL_CHECKPOINT_KEY) && !props.get(REPL_CHECKPOINT_KEY).isEmpty()) { + if (props.get(REPL_CHECKPOINT_KEY).equals(dumpRoot)) { + return true; + } + throw new InvalidOperationException("REPL LOAD with Dump: " + dumpRoot + + " is not allowed as the target DB: " + dbName + + " is already bootstrap loaded by another Dump " + props.get(REPL_CHECKPOINT_KEY)); + } + return false; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/b8596289/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index 97917f8..76fb2a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -112,8 +112,10 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); } work.updateDbEventState(dbEvent.toState()); - scope.database = true; - scope.rootTasks.addAll(dbTracker.tasks()); + if (dbTracker.hasTasks()) { + scope.rootTasks.addAll(dbTracker.tasks()); + scope.database = true; + } dbTracker.debugLog("database"); break; case Table: { @@ -129,11 +131,11 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { LoadTable loadTable = new LoadTable(tableEvent, context, iterator.replLogger(), tableContext, loadTaskTracker); tableTracker = loadTable.tasks(); - if (!scope.database) { + setUpDependencies(dbTracker, tableTracker); + if (!scope.database && tableTracker.hasTasks()) { scope.rootTasks.addAll(tableTracker.tasks()); scope.table = true; } - setUpDependencies(dbTracker, tableTracker); /* for table replication if we reach the max number of tasks then for the next run we will try to reload the same table again, this is mainly for ease of understanding the code @@ -285,9 +287,15 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { This sets up dependencies such that a child task is dependant on the parent to be complete. */ private void setUpDependencies(TaskTracker parentTasks, TaskTracker childTasks) { - for (Task<? extends Serializable> parentTask : parentTasks.tasks()) { + if (parentTasks.hasTasks()) { + for (Task<? extends Serializable> parentTask : parentTasks.tasks()) { + for (Task<? extends Serializable> childTask : childTasks.tasks()) { + parentTask.addDependentTask(childTask); + } + } + } else { for (Task<? extends Serializable> childTask : childTasks.tasks()) { - parentTask.addDependentTask(childTask); + parentTasks.addTask(childTask); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/b8596289/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java index 0fabf5a..d203ae4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -54,7 +54,7 @@ public class FSTableEvent implements TableEvent { } public boolean shouldNotReplicate() { - ReplicationSpec spec = metadata.getReplicationSpec(); + ReplicationSpec spec = replicationSpec(); return spec.isNoop() || !spec.isInReplicationScope(); } @@ -69,7 +69,7 @@ public class FSTableEvent implements TableEvent { Table table = new Table(metadata.getTable()); ImportTableDesc tableDesc = new ImportTableDesc(StringUtils.isBlank(dbName) ? table.getDbName() : dbName, table); - tableDesc.setReplicationSpec(metadata.getReplicationSpec()); + tableDesc.setReplicationSpec(replicationSpec()); return tableDesc; } catch (Exception e) { throw new SemanticException(e); @@ -122,7 +122,7 @@ public class FSTableEvent implements TableEvent { partDesc.setSortCols(partition.getSd().getSortCols()); partDesc.setLocation(new Path(fromPath, Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString()); - partsDesc.setReplicationSpec(metadata.getReplicationSpec()); + partsDesc.setReplicationSpec(replicationSpec()); return partsDesc; } catch (Exception e) { throw new SemanticException(e); http://git-wip-us.apache.org/repos/asf/hive/blob/b8596289/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java index de17d70..26f4892 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java @@ -17,8 +17,20 @@ */ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; +import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; +import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage; +import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage; +import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent; @@ -50,6 +62,7 @@ public class LoadConstraint { private final ConstraintEvent event; private final String dbNameToLoadIn; private final TaskTracker tracker; + private final MessageDeserializer deserializer = MessageFactory.getInstance().getDeserializer(); public LoadConstraint(Context context, ConstraintEvent event, String dbNameToLoadIn, TaskTracker existingTracker) { @@ -73,7 +86,7 @@ public class LoadConstraint { String nnsString = json.getString("nns"); List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); - if (pksString != null && !pksString.isEmpty()) { + if (pksString != null && !pksString.isEmpty() && !isPrimaryKeysAlreadyLoaded(pksString)) { AddPrimaryKeyHandler pkHandler = new AddPrimaryKeyHandler(); DumpMetaData pkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_PRIMARYKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, context.hiveConf); @@ -84,7 +97,7 @@ public class LoadConstraint { context.hiveDb, context.nestedContext, LOG))); } - if (uksString != null && !uksString.isEmpty()) { + if (uksString != null && !uksString.isEmpty() && !isUniqueConstraintsAlreadyLoaded(uksString)) { AddUniqueConstraintHandler ukHandler = new AddUniqueConstraintHandler(); DumpMetaData ukDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_UNIQUECONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, context.hiveConf); @@ -95,7 +108,7 @@ public class LoadConstraint { context.hiveDb, context.nestedContext, LOG))); } - if (nnsString != null && !nnsString.isEmpty()) { + if (nnsString != null && !nnsString.isEmpty() && !isNotNullConstraintsAlreadyLoaded(nnsString)) { AddNotNullConstraintHandler nnHandler = new AddNotNullConstraintHandler(); DumpMetaData nnDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_NOTNULLCONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, context.hiveConf); @@ -106,7 +119,7 @@ public class LoadConstraint { context.hiveDb, context.nestedContext, LOG))); } - if (fksString != null && !fksString.isEmpty()) { + if (fksString != null && !fksString.isEmpty() && !isForeignKeysAlreadyLoaded(fksString)) { AddForeignKeyHandler fkHandler = new AddForeignKeyHandler(); DumpMetaData fkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_FOREIGNKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, context.hiveConf); @@ -124,4 +137,72 @@ public class LoadConstraint { } } + private boolean isPrimaryKeysAlreadyLoaded(String pksMsgString) throws Exception { + AddPrimaryKeyMessage msg = deserializer.getAddPrimaryKeyMessage(pksMsgString); + List<SQLPrimaryKey> pksInMsg = msg.getPrimaryKeys(); + if (pksInMsg.isEmpty()) { + return true; + } + + String dbName = StringUtils.isBlank(dbNameToLoadIn) ? pksInMsg.get(0).getTable_db() : dbNameToLoadIn; + List<SQLPrimaryKey> pks; + try { + pks = context.hiveDb.getPrimaryKeyList(dbName, pksInMsg.get(0).getTable_name()); + } catch (NoSuchObjectException e) { + return false; + } + return ((pks != null) && !pks.isEmpty()); + } + + private boolean isForeignKeysAlreadyLoaded(String fksMsgString) throws Exception { + AddForeignKeyMessage msg = deserializer.getAddForeignKeyMessage(fksMsgString); + List<SQLForeignKey> fksInMsg = msg.getForeignKeys(); + if (fksInMsg.isEmpty()) { + return true; + } + + String dbName = StringUtils.isBlank(dbNameToLoadIn) ? fksInMsg.get(0).getFktable_db() : dbNameToLoadIn; + List<SQLForeignKey> fks; + try { + fks = context.hiveDb.getForeignKeyList(dbName, fksInMsg.get(0).getFktable_name()); + } catch (NoSuchObjectException e) { + return false; + } + return ((fks != null) && !fks.isEmpty()); + } + + private boolean isUniqueConstraintsAlreadyLoaded(String uksMsgString) throws Exception { + AddUniqueConstraintMessage msg = deserializer.getAddUniqueConstraintMessage(uksMsgString); + List<SQLUniqueConstraint> uksInMsg = msg.getUniqueConstraints(); + if (uksInMsg.isEmpty()) { + return true; + } + + String dbName = StringUtils.isBlank(dbNameToLoadIn) ? uksInMsg.get(0).getTable_db() : dbNameToLoadIn; + List<SQLUniqueConstraint> uks; + try { + uks = context.hiveDb.getUniqueConstraintList(dbName, uksInMsg.get(0).getTable_name()); + } catch (NoSuchObjectException e) { + return false; + } + return ((uks != null) && !uks.isEmpty()); + } + + private boolean isNotNullConstraintsAlreadyLoaded(String nnsMsgString) throws Exception { + AddNotNullConstraintMessage msg = deserializer.getAddNotNullConstraintMessage(nnsMsgString); + List<SQLNotNullConstraint> nnsInMsg = msg.getNotNullConstraints(); + if (nnsInMsg.isEmpty()) { + return true; + } + + String dbName = StringUtils.isBlank(dbNameToLoadIn) ? nnsInMsg.get(0).getTable_db() : dbNameToLoadIn; + List<SQLNotNullConstraint> nns; + try { + nns = context.hiveDb.getNotNullConstraintList(dbName, nnsInMsg.get(0).getTable_name()); + } catch (NoSuchObjectException e) { + return false; + } + return ((nns != null) && !nns.isEmpty()); + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/b8596289/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java index c5f2779..0270d2a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.PrincipalDesc; import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType; import java.io.Serializable; import java.util.HashMap; @@ -58,11 +59,23 @@ public class LoadDatabase { public TaskTracker tasks() throws SemanticException { try { Database dbInMetadata = readDbMetadata(); - Task<? extends Serializable> dbRootTask = existEmptyDb(dbInMetadata.getName()) - ? alterDbTask(dbInMetadata, context.hiveConf) - : createDbTask(dbInMetadata); - dbRootTask.addDependentTask(setOwnerInfoTask(dbInMetadata)); - tracker.addTask(dbRootTask); + String dbName = dbInMetadata.getName(); + Task<? extends Serializable> dbRootTask = null; + ReplLoadOpType loadDbType = getLoadDbType(dbName); + switch (loadDbType) { + case LOAD_NEW: + dbRootTask = createDbTask(dbInMetadata); + break; + case LOAD_REPLACE: + dbRootTask = alterDbTask(dbInMetadata); + break; + default: + break; + } + if (dbRootTask != null) { + dbRootTask.addDependentTask(setOwnerInfoTask(dbInMetadata)); + tracker.addTask(dbRootTask); + } return tracker; } catch (Exception e) { throw new SemanticException(e); @@ -73,24 +86,44 @@ public class LoadDatabase { return event.dbInMetadata(dbNameToLoadIn); } + private ReplLoadOpType getLoadDbType(String dbName) throws InvalidOperationException, HiveException { + Database db = context.hiveDb.getDatabase(dbName); + if (db == null) { + return ReplLoadOpType.LOAD_NEW; + } + if (isDbAlreadyBootstrapped(db)) { + throw new InvalidOperationException("Bootstrap REPL LOAD is not allowed on Database: " + dbName + + " as it was already done."); + } + if (ReplUtils.replCkptStatus(dbName, db.getParameters(), context.dumpDirectory)) { + return ReplLoadOpType.LOAD_SKIP; + } + if (isDbEmpty(dbName)) { + return ReplLoadOpType.LOAD_REPLACE; + } + throw new InvalidOperationException("Bootstrap REPL LOAD is not allowed on Database: " + dbName + + " as it is not empty. One or more tables/functions exist."); + } + + private boolean isDbAlreadyBootstrapped(Database db) { + Map<String, String> props = db.getParameters(); + return ((props != null) + && props.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()) + && !props.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()).isEmpty()); + } + + private boolean isDbEmpty(String dbName) throws HiveException { + List<String> allTables = context.hiveDb.getAllTables(dbName); + List<String> allFunctions = context.hiveDb.getFunctions(dbName, "*"); + return allTables.isEmpty() && allFunctions.isEmpty(); + } + private Task<? extends Serializable> createDbTask(Database dbObj) { CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc(); createDbDesc.setName(dbObj.getName()); createDbDesc.setComment(dbObj.getDescription()); + createDbDesc.setDatabaseProperties(updateDbProps(dbObj, context.dumpDirectory)); - /* - explicitly remove the setting of last.repl.id from the db object parameters as loadTask is going - to run multiple times and explicit logic is in place which prevents updates to tables when db level - last repl id is set and we create a AlterDatabaseTask at the end of processing a database. - */ - Map<String, String> parameters = new HashMap<>(dbObj.getParameters()); - parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString()); - - // Add the checkpoint key to the Database binding it to current dump directory. - // So, if retry using same dump, we shall skip Database object update. - parameters.put(ReplUtils.REPL_CHECKPOINT_KEY, context.dumpDirectory); - - createDbDesc.setDatabaseProperties(parameters); // note that we do not set location - for repl load, we want that auto-created. createDbDesc.setIfNotExists(false); // If it exists, we want this to be an error condition. Repl Load is not intended to replace a @@ -100,11 +133,8 @@ public class LoadDatabase { return TaskFactory.get(work, context.hiveConf); } - private static Task<? extends Serializable> alterDbTask(Database dbObj, HiveConf hiveConf) { - AlterDatabaseDesc alterDbDesc = - new AlterDatabaseDesc(dbObj.getName(), dbObj.getParameters(), null); - DDLWork work = new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc); - return TaskFactory.get(work, hiveConf); + private Task<? extends Serializable> alterDbTask(Database dbObj) { + return alterDbTask(dbObj.getName(), updateDbProps(dbObj, context.dumpDirectory), context.hiveConf); } private Task<? extends Serializable> setOwnerInfoTask(Database dbObj) { @@ -115,18 +145,27 @@ public class LoadDatabase { return TaskFactory.get(work, context.hiveConf); } - private boolean existEmptyDb(String dbName) throws InvalidOperationException, HiveException { - Database db = context.hiveDb.getDatabase(dbName); - if (db == null) { - return false; - } - List<String> allTables = context.hiveDb.getAllTables(dbName); - List<String> allFunctions = context.hiveDb.getFunctions(dbName, "*"); - if (allTables.isEmpty() && allFunctions.isEmpty()) { - return true; - } - throw new InvalidOperationException( - "Database " + db.getName() + " is not empty. One or more tables/functions exist."); + private static Map<String, String> updateDbProps(Database dbObj, String dumpDirectory) { + /* + explicitly remove the setting of last.repl.id from the db object parameters as loadTask is going + to run multiple times and explicit logic is in place which prevents updates to tables when db level + last repl id is set and we create a AlterDatabaseTask at the end of processing a database. + */ + Map<String, String> parameters = new HashMap<>(dbObj.getParameters()); + parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + + // Add the checkpoint key to the Database binding it to current dump directory. + // So, if retry using same dump, we shall skip Database object update. + parameters.put(ReplUtils.REPL_CHECKPOINT_KEY, dumpDirectory); + return parameters; + } + + private static Task<? extends Serializable> alterDbTask(String dbName, Map<String, String> props, + HiveConf hiveConf) { + AlterDatabaseDesc alterDbDesc = + new AlterDatabaseDesc(dbName, props, null); + DDLWork work = new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc); + return TaskFactory.get(work, hiveConf); } public static class AlterDatabase extends LoadDatabase { @@ -138,7 +177,8 @@ public class LoadDatabase { @Override public TaskTracker tasks() throws SemanticException { - tracker.addTask(alterDbTask(readDbMetadata(), context.hiveConf)); + Database dbObj = readDbMetadata(); + tracker.addTask(alterDbTask(dbObj.getName(), dbObj.getParameters(), context.hiveConf)); return tracker; } } http://git-wip-us.apache.org/repos/asf/hive/blob/b8596289/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java index c100344..b886ff4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java @@ -17,7 +17,11 @@ */ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -26,9 +30,11 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; import org.slf4j.Logger; @@ -71,6 +77,9 @@ public class LoadFunction { Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); try { + if (isFunctionAlreadyLoaded(fromPath)) { + return tracker; + } CreateFunctionHandler handler = new CreateFunctionHandler(); List<Task<? extends Serializable>> tasks = handler.handle( new MessageHandler.Context( @@ -85,4 +94,21 @@ public class LoadFunction { } } + private boolean isFunctionAlreadyLoaded(Path funcDumpRoot) throws HiveException, IOException { + Path metadataPath = new Path(funcDumpRoot, EximUtil.METADATA_NAME); + FileSystem fs = FileSystem.get(metadataPath.toUri(), context.hiveConf); + MetaData metadata = EximUtil.readMetaData(fs, metadataPath); + Function function; + try { + String dbName = StringUtils.isBlank(dbNameToLoadIn) ? metadata.function.getDbName() : dbNameToLoadIn; + function = context.hiveDb.getFunction(dbName, metadata.function.getFunctionName()); + } catch (HiveException e) { + if (e.getCause() instanceof NoSuchObjectException) { + return false; + } + throw e; + } + return (function != null); + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/b8596289/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index f5b8e86..f6493f7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.ReplCopyTask; @@ -27,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; @@ -37,11 +39,12 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropTableDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; @@ -166,6 +169,13 @@ public class LoadPartitions { } private TaskTracker forNewTable() throws Exception { + Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName()); + // If table doesn't exist, allow creating a new one only if the database state is older than the update. + // This in-turn applicable for partitions creation as well. + if ((parentDb != null) && (!event.replicationSpec().allowReplacementInto(parentDb.getParameters()))) { + return tracker; + } + Iterator<AddPartitionDesc> iterator = event.partitionDescriptions(tableDesc).iterator(); while (iterator.hasNext() && tracker.canAddMoreTasks()) { AddPartitionDesc currentPartitionDesc = iterator.next(); @@ -173,13 +183,14 @@ public class LoadPartitions { the currentPartitionDesc cannot be inlined as we need the hasNext() to be evaluated post the current retrieved lastReplicatedPartition */ - addPartition(iterator.hasNext(), currentPartitionDesc); + addPartition(iterator.hasNext(), currentPartitionDesc, null); } return tracker; } - private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartitionDesc) throws Exception { - tracker.addTask(tasksForAddPartition(table, addPartitionDesc)); + private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartitionDesc, Task<?> ptnRootTask) + throws Exception { + tracker.addTask(tasksForAddPartition(table, addPartitionDesc, ptnRootTask)); if (hasMorePartitions && !tracker.canAddMoreTasks()) { ReplicationState currentReplicationState = new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc)); @@ -190,29 +201,36 @@ public class LoadPartitions { /** * returns the root task for adding a partition */ - private Task<? extends Serializable> tasksForAddPartition(Table table, - AddPartitionDesc addPartitionDesc) throws MetaException, IOException, HiveException { + private Task<?> tasksForAddPartition(Table table, AddPartitionDesc addPartitionDesc, Task<?> ptnRootTask) + throws MetaException, IOException, HiveException { + Task<?> addPartTask = TaskFactory.get( + new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc), + context.hiveConf + ); + if (event.replicationSpec().isMetadataOnly()) { + if (ptnRootTask == null) { + ptnRootTask = addPartTask; + } else { + ptnRootTask.addDependentTask(addPartTask); + } + return ptnRootTask; + } + AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation()); Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec); partSpec.setLocation(replicaWarehousePartitionLocation.toString()); LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition " - + partSpecToString(partSpec.getPartSpec()) + " with source location: " - + partSpec.getLocation()); - Path tmpPath = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); + + partSpecToString(partSpec.getPartSpec()) + " with source location: " + + partSpec.getLocation()); + Path tmpPath = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); Task<?> copyTask = ReplCopyTask.getLoadCopyTask( event.replicationSpec(), sourceWarehousePartitionLocation, tmpPath, context.hiveConf ); - - Task<?> addPartTask = TaskFactory.get( - new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc), - context.hiveConf - ); - Task<?> movePartitionTask = movePartitionTask(table, partSpec, tmpPath); // Set Checkpoint task as dependant to add partition tasks. So, if same dump is retried for @@ -224,11 +242,16 @@ public class LoadPartitions { context.hiveConf ); + if (ptnRootTask == null) { + ptnRootTask = copyTask; + } else { + ptnRootTask.addDependentTask(copyTask); + } copyTask.addDependentTask(addPartTask); addPartTask.addDependentTask(movePartitionTask); movePartitionTask.addDependentTask(ckptTask); - return copyTask; + return ptnRootTask; } /** @@ -271,17 +294,18 @@ public class LoadPartitions { } } - private Task<? extends Serializable> alterSinglePartition(AddPartitionDesc desc, - ReplicationSpec replicationSpec, Partition ptn) { - desc.setReplaceMode(true); - if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) { - desc.setReplicationSpec(replicationSpec); + private Task<?> dropPartitionTask(Table table, Map<String, String> partSpec) throws SemanticException { + Task<DDLWork> dropPtnTask = null; + Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecsExpr = + ReplUtils.genPartSpecs(table, Collections.singletonList(partSpec)); + if (partSpecsExpr.size() > 0) { + DropTableDesc dropPtnDesc = new DropTableDesc(table.getFullyQualifiedName(), + partSpecsExpr, null, true, event.replicationSpec()); + dropPtnTask = TaskFactory.get( + new DDLWork(new HashSet<>(), new HashSet<>(), dropPtnDesc), context.hiveConf + ); } - desc.getPartition(0).setLocation(ptn.getLocation()); // use existing location - return TaskFactory.get( - new DDLWork(new HashSet<>(), new HashSet<>(), desc), - context.hiveConf - ); + return dropPtnTask; } private TaskTracker forExistingTable(AddPartitionDesc lastPartitionReplicated) throws Exception { @@ -293,7 +317,6 @@ public class LoadPartitions { StringUtils.mapToString(lastReplicatedPartSpec)); } - ReplicationSpec replicationSpec = event.replicationSpec(); Iterator<AddPartitionDesc> partitionIterator = event.partitionDescriptions(tableDesc).iterator(); while (!encounteredTheLastReplicatedPartition && partitionIterator.hasNext()) { AddPartitionDesc addPartitionDesc = partitionIterator.next(); @@ -304,33 +327,33 @@ public class LoadPartitions { while (partitionIterator.hasNext() && tracker.canAddMoreTasks()) { AddPartitionDesc addPartitionDesc = partitionIterator.next(); Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec(); - Partition ptn = context.hiveDb.getPartition(table, partSpec, false); - if (ptn == null) { - if (!replicationSpec.isMetadataOnly()) { - addPartition(partitionIterator.hasNext(), addPartitionDesc); - } - } else { - // If replicating, then the partition already existing means we need to replace, maybe, if - // the destination ptn's repl.last.id is older than the replacement's. - if (replicationSpec.allowReplacementInto(ptn.getParameters())) { - if (replicationSpec.isMetadataOnly()) { - tracker.addTask(alterSinglePartition(addPartitionDesc, replicationSpec, ptn)); - if (!tracker.canAddMoreTasks()) { - tracker.setReplicationState( - new ReplicationState( - new PartitionState(table.getTableName(), addPartitionDesc) - ) - ); - } - } else { - addPartition(partitionIterator.hasNext(), addPartitionDesc); - } - } else { - // ignore this ptn, do nothing, not an error. - } + Task<?> ptnRootTask = null; + ReplLoadOpType loadPtnType = getLoadPartitionType(partSpec); + switch (loadPtnType) { + case LOAD_NEW: + break; + case LOAD_REPLACE: + ptnRootTask = dropPartitionTask(table, partSpec); + break; + case LOAD_SKIP: + continue; + default: + break; } + addPartition(partitionIterator.hasNext(), addPartitionDesc, ptnRootTask); } return tracker; } + + private ReplLoadOpType getLoadPartitionType(Map<String, String> partSpec) throws InvalidOperationException, HiveException { + Partition ptn = context.hiveDb.getPartition(table, partSpec, false); + if (ptn == null) { + return ReplLoadOpType.LOAD_NEW; + } + if (ReplUtils.replCkptStatus(tableContext.dbNameToLoadIn, ptn.getParameters(), context.dumpDirectory)) { + return ReplLoadOpType.LOAD_SKIP; + } + return ReplLoadOpType.LOAD_REPLACE; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/b8596289/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 6d093fb..419a511 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -27,17 +28,21 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DropTableDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; @@ -88,7 +93,6 @@ public class LoadTable { // Executed if relevant, and used to contain all the other details about the table if not. ImportTableDesc tableDesc = tableContext.overrideProperties(event.tableDesc(dbName)); Table table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb); - ReplicationSpec replicationSpec = event.replicationSpec(); // Normally, on import, trying to create a table or a partition in a db that does not yet exist // is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying @@ -106,25 +110,24 @@ public class LoadTable { } } - if (table == null) { - // If table doesn't exist, allow creating a new one only if the database state is older than the update. - if ((parentDb != null) && (!replicationSpec - .allowReplacementInto(parentDb.getParameters()))) { - // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. + Task<?> tblRootTask = null; + ReplLoadOpType loadTblType = getLoadTableType(table); + switch (loadTblType) { + case LOAD_NEW: + break; + case LOAD_REPLACE: + tblRootTask = dropTableTask(table); + break; + case LOAD_SKIP: return tracker; - } - } else { - if (!replicationSpec.allowReplacementInto(table.getParameters())) { - // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. - return tracker; - } + default: + break; } if (tableDesc.getLocation() == null) { tableDesc.setLocation(location(tableDesc, parentDb)); } - /* Note: In the following section, Metadata-only import handling logic is interleaved with regular repl-import logic. The rule of thumb being followed here is that MD-only imports are essentially ALTERs. They do @@ -134,11 +137,7 @@ public class LoadTable { or in the case of an unpartitioned table. In all other cases, it should behave like a noop or a pure MD alter. */ - if (table == null) { - newTableTasks(tableDesc); - } else { - existingTableTasks(tableDesc, table, replicationSpec); - } + newTableTasks(tableDesc, tblRootTask); // Set Checkpoint task as dependant to create table task. So, if same dump is retried for // bootstrap, we skip current table update. @@ -160,54 +159,48 @@ public class LoadTable { } } - private void existingTableTasks(ImportTableDesc tblDesc, Table table, - ReplicationSpec replicationSpec) { - if (!table.isPartitioned()) { - - LOG.debug("table non-partitioned"); - if (!replicationSpec.allowReplacementInto(table.getParameters())) { - return; // silently return, table is newer than our replacement. - } - - Task<? extends Serializable> alterTableTask = alterTableTask(tblDesc, replicationSpec); - if (replicationSpec.isMetadataOnly()) { - tracker.addTask(alterTableTask); - } else { - Task<?> loadTableTask = - loadTableTask(table, replicationSpec, table.getDataLocation(), event.metadataPath()); - alterTableTask.addDependentTask(loadTableTask); - tracker.addTask(alterTableTask); - } + private ReplLoadOpType getLoadTableType(Table table) throws InvalidOperationException, HiveException { + if (table == null) { + return ReplLoadOpType.LOAD_NEW; + } + if (ReplUtils.replCkptStatus(table.getDbName(), table.getParameters(), context.dumpDirectory)) { + return ReplLoadOpType.LOAD_SKIP; } + return ReplLoadOpType.LOAD_REPLACE; } - private void newTableTasks(ImportTableDesc tblDesc) throws Exception { + private void newTableTasks(ImportTableDesc tblDesc, Task<?> tblRootTask) throws Exception { Table table = tblDesc.toTable(context.hiveConf); - // Either we're dropping and re-creating, or the table didn't exist, and we're creating. + ReplicationSpec replicationSpec = event.replicationSpec(); Task<?> createTableTask = tblDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), context.hiveConf); - if (event.replicationSpec().isMetadataOnly()) { - tracker.addTask(createTableTask); + if (tblRootTask == null) { + tblRootTask = createTableTask; + } else { + tblRootTask.addDependentTask(createTableTask); + } + if (replicationSpec.isMetadataOnly()) { + tracker.addTask(tblRootTask); return; } Task<?> parentTask = createTableTask; - if (event.replicationSpec().isTransactionalTableDump()) { + if (replicationSpec.isTransactionalTableDump()) { List<String> partNames = isPartitioned(tblDesc) ? event.partitions(tblDesc) : null; ReplTxnWork replTxnWork = new ReplTxnWork(tblDesc.getDatabaseName(), tblDesc.getTableName(), partNames, - event.replicationSpec().getValidWriteIdList(), ReplTxnWork.OperationType.REPL_WRITEID_STATE); + replicationSpec.getValidWriteIdList(), ReplTxnWork.OperationType.REPL_WRITEID_STATE); Task<?> replTxnTask = TaskFactory.get(replTxnWork, context.hiveConf); - createTableTask.addDependentTask(replTxnTask); + parentTask.addDependentTask(replTxnTask); parentTask = replTxnTask; } if (!isPartitioned(tblDesc)) { LOG.debug("adding dependent ReplTxnTask/CopyWork/MoveWork for table"); Task<?> loadTableTask = - loadTableTask(table, event.replicationSpec(), new Path(tblDesc.getLocation()), + loadTableTask(table, replicationSpec, new Path(tblDesc.getLocation()), event.metadataPath()); parentTask.addDependentTask(loadTableTask); } - tracker.addTask(createTableTask); + tracker.addTask(tblRootTask); } private String location(ImportTableDesc tblDesc, Database parentDb) @@ -248,12 +241,10 @@ public class LoadTable { return copyTask; } - private Task<? extends Serializable> alterTableTask(ImportTableDesc tableDesc, - ReplicationSpec replicationSpec) { - tableDesc.setReplaceMode(true); - if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) { - tableDesc.setReplicationSpec(replicationSpec); - } - return tableDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), context.hiveConf); + private Task<?> dropTableTask(Table table) { + assert(table != null); + DropTableDesc dropTblDesc = new DropTableDesc(table.getFullyQualifiedName(), table.getTableType(), + true, false, event.replicationSpec()); + return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), dropTblDesc), context.hiveConf); } } http://git-wip-us.apache.org/repos/asf/hive/blob/b8596289/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index b913f69..a4b54c0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -24,7 +24,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; -import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; @@ -353,10 +352,13 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { // import job in its place. try { - + assert(path != null); Path loadPath = new Path(path); final FileSystem fs = loadPath.getFileSystem(conf); + // Make fully qualified path for further use. + loadPath = fs.makeQualified(loadPath); + if (!fs.exists(loadPath)) { // supposed dump path does not exist. throw new FileNotFoundException(loadPath.toUri().toString()); http://git-wip-us.apache.org/repos/asf/hive/blob/b8596289/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java index 7281a1c..939884d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java @@ -20,21 +20,15 @@ package org.apache.hadoop.hive.ql.parse.repl.load.message; import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DropTableDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import java.io.Serializable; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,7 +41,7 @@ public class DropPartitionHandler extends AbstractMessageHandler { String actualDbName = context.isDbNameEmpty() ? msg.getDB() : context.dbName; String actualTblName = context.isTableNameEmpty() ? msg.getTable() : context.tableName; Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = - genPartSpecs(new Table(msg.getTableObj()), + ReplUtils.genPartSpecs(new Table(msg.getTableObj()), msg.getPartitions()); if (partSpecs.size() > 0) { DropTableDesc dropPtnDesc = new DropTableDesc(actualDbName + "." + actualTblName, @@ -70,37 +64,4 @@ public class DropPartitionHandler extends AbstractMessageHandler { : new SemanticException("Error reading message members", e); } } - - private Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs(Table table, - List<Map<String, String>> partitions) throws SemanticException { - Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = new HashMap<>(); - int partPrefixLength = 0; - if (partitions.size() > 0) { - partPrefixLength = partitions.get(0).size(); - // pick the length of the first ptn, we expect all ptns listed to have the same number of - // key-vals. - } - List<ExprNodeGenericFuncDesc> partitionDesc = new ArrayList<>(); - for (Map<String, String> ptn : partitions) { - // convert each key-value-map to appropriate expression. - ExprNodeGenericFuncDesc expr = null; - for (Map.Entry<String, String> kvp : ptn.entrySet()) { - String key = kvp.getKey(); - Object val = kvp.getValue(); - String type = table.getPartColByName(key).getType(); - PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type); - ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true); - ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate( - "=", column, new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, val)); - expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op); - } - if (expr != null) { - partitionDesc.add(expr); - } - } - if (partitionDesc.size() > 0) { - partSpecs.put(partPrefixLength, partitionDesc); - } - return partSpecs; - } } http://git-wip-us.apache.org/repos/asf/hive/blob/b8596289/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 78b82b4..3e186b7 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -9925,19 +9925,20 @@ public class ObjectStore implements RawStore, Configurable { final String catName, final String parent_db_name_input, final String parent_tbl_name_input, final String foreign_db_name_input, final String foreign_tbl_name_input, boolean allowSql, boolean allowJdo) throws MetaException, NoSuchObjectException { - final String parent_db_name = parent_db_name_input; - final String parent_tbl_name = parent_tbl_name_input; - final String foreign_db_name = foreign_db_name_input; - final String foreign_tbl_name = foreign_tbl_name_input; + final String parent_db_name = (parent_db_name_input != null) ? normalizeIdentifier(parent_db_name_input) : null; + final String parent_tbl_name = (parent_tbl_name_input != null) ? normalizeIdentifier(parent_tbl_name_input) : null; + final String foreign_db_name = (foreign_db_name_input != null) ? normalizeIdentifier(foreign_db_name_input) : null; + final String foreign_tbl_name = (foreign_tbl_name_input != null) + ? normalizeIdentifier(foreign_tbl_name_input) : null; final String db_name; final String tbl_name; if (foreign_tbl_name == null) { // The FK table name might be null if we are retrieving the constraint from the PK side - db_name = parent_db_name_input; - tbl_name = parent_tbl_name_input; + db_name = parent_db_name; + tbl_name = parent_tbl_name; } else { - db_name = foreign_db_name_input; - tbl_name = foreign_tbl_name_input; + db_name = foreign_db_name; + tbl_name = foreign_tbl_name; } return new GetListHelper<SQLForeignKey>(catName, db_name, tbl_name, allowSql, allowJdo) {