HIVE-19739: Bootstrap REPL LOAD to use checkpoints to validate and skip the 
loaded data/metadata (Sankar Hariappan, reviewed by Mahesh Kumar Behera, 
Anishek Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f20311b0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f20311b0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f20311b0

Branch: refs/heads/master
Commit: f20311b00aa57586db630d57d9d9d9b323cc2e96
Parents: bcbd2d5
Author: Sankar Hariappan <sank...@apache.org>
Authored: Thu Jun 14 19:36:03 2018 +0530
Committer: Sankar Hariappan <sank...@apache.org>
Committed: Thu Jun 14 19:36:03 2018 +0530

----------------------------------------------------------------------
 .../TestReplicationScenariosAcidTables.java     |  93 ++++++
 ...TestReplicationScenariosAcrossInstances.java | 332 +++++++++++++++++--
 .../hadoop/hive/ql/parse/WarehouseInstance.java |  70 +++-
 .../hadoop/hive/ql/exec/repl/ReplUtils.java     |  65 ++++
 .../ql/exec/repl/bootstrap/ReplLoadTask.java    |  20 +-
 .../events/filesystem/FSTableEvent.java         |   6 +-
 .../repl/bootstrap/load/LoadConstraint.java     |  89 ++++-
 .../exec/repl/bootstrap/load/LoadDatabase.java  | 112 +++++--
 .../exec/repl/bootstrap/load/LoadFunction.java  |  26 ++
 .../bootstrap/load/table/LoadPartitions.java    | 125 ++++---
 .../repl/bootstrap/load/table/LoadTable.java    |  95 +++---
 .../ql/parse/ReplicationSemanticAnalyzer.java   |   6 +-
 .../repl/load/message/DropPartitionHandler.java |  43 +--
 .../hadoop/hive/metastore/ObjectStore.java      |  17 +-
 .../InjectableBehaviourObjectStore.java         | 119 ++++++-
 15 files changed, 968 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f20311b0/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 9a2d296..a1498ca 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -28,6 +28,9 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
+import 
org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
+import 
org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
 import static 
org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 import org.junit.rules.TestName;
 import org.junit.rules.TestRule;
@@ -45,6 +48,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import javax.annotation.Nullable;
 
 /**
  * TestReplicationScenariosAcidTables - test replication for ACID tables
@@ -345,4 +349,93 @@ public class TestReplicationScenariosAcidTables {
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(bootStrapDump.lastReplicationId);
   }
+
+  @Test
+  public void testAcidBootstrapReplLoadRetryAfterFailure() throws Throwable {
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets 
stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("insert into t1 values(1)")
+            .run("create table t2 (rank int) partitioned by (name string) 
tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .run("insert into t2 partition(name='bob') values(11)")
+            .run("insert into t2 partition(name='carl') values(10)")
+            .dump(primaryDbName, null);
+
+    WarehouseInstance.Tuple tuple2 = primary
+            .run("use " + primaryDbName)
+            .dump(primaryDbName, null);
+
+    // Inject a behavior where REPL LOAD failed when try to load table "t2", 
it fails.
+    BehaviourInjection<CallerArguments, Boolean> callerVerifier
+            = new BehaviourInjection<CallerArguments, Boolean>() {
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable CallerArguments args) {
+        injectionPathCalled = true;
+        if (!args.dbName.equalsIgnoreCase(replicatedDbName)) {
+          LOG.warn("Verifier - DB: " + String.valueOf(args.dbName));
+          return false;
+        }
+        if (args.tblName != null) {
+          LOG.warn("Verifier - Table: " + String.valueOf(args.tblName));
+          return args.tblName.equals("t1");
+        }
+        return true;
+      }
+    };
+    InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
+
+    List<String> withConfigs = 
Arrays.asList("'hive.repl.approx.max.load.tasks'='1'");
+    replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+    callerVerifier.assertInjectionsPerformed(true, false);
+    InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the 
behaviour
+
+    replica.run("use " + replicatedDbName)
+            .run("repl status " + replicatedDbName)
+            .verifyResult("null")
+            .run("show tables")
+            .verifyResults(new String[] { "t1" })
+            .run("select id from t1")
+            .verifyResults(Arrays.asList("1"));
+
+    // Retry with different dump should fail.
+    replica.loadFailure(replicatedDbName, tuple2.dumpLocation);
+
+    // Verify if no create table on t1. Only table t2 should  be created in 
retry.
+    callerVerifier = new BehaviourInjection<CallerArguments, Boolean>() {
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable CallerArguments args) {
+        injectionPathCalled = true;
+        if (!args.dbName.equalsIgnoreCase(replicatedDbName)) {
+          LOG.warn("Verifier - DB: " + String.valueOf(args.dbName));
+          return false;
+        }
+        if (args.tblName != null) {
+          LOG.warn("Verifier - Table: " + String.valueOf(args.tblName));
+          return args.tblName.equals("t2");
+        }
+        return true;
+      }
+    };
+    InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
+
+    // Retry with same dump with which it was already loaded should resume the 
bootstrap load.
+    // This time, it completes by adding just constraints for table t4.
+    replica.load(replicatedDbName, tuple.dumpLocation);
+    callerVerifier.assertInjectionsPerformed(true, false);
+    InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the 
behaviour
+
+    replica.run("use " + replicatedDbName)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("show tables")
+            .verifyResults(new String[] { "t1", "t2" })
+            .run("select id from t1")
+            .verifyResults(Arrays.asList("1"))
+            .run("select name from t2 order by name")
+            .verifyResults(Arrays.asList("bob", "carl"));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f20311b0/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 35437b1..4b3a976 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -29,6 +29,9 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.hadoop.hive.ql.util.DependencyResolver;
 import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
+import 
org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
+import 
org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -53,11 +56,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static 
org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
@@ -436,8 +441,10 @@ public class TestReplicationScenariosAcrossInstances {
        End of additional steps
     */
 
+    // Reset ckpt and last repl ID keys to empty set for allowing bootstrap 
load
     replica.run("show databases")
         .verifyFailure(new String[] { primaryDbName, dbOne, dbTwo })
+        .run("alter database default set dbproperties 
('hive.repl.ckpt.key'='', 'repl.last.id'='')")
         .load("", tuple.dumpLocation)
         .run("show databases")
         .verifyResults(new String[] { "default", primaryDbName, dbOne, dbTwo })
@@ -450,8 +457,9 @@ public class TestReplicationScenariosAcrossInstances {
         .run("use " + dbTwo)
         .run("show tables")
         .verifyResults(new String[] { "t1" });
+
     /*
-      Start of cleanup
+       Start of cleanup
     */
 
     replica.run("drop database " + primaryDbName + " cascade");
@@ -505,8 +513,10 @@ public class TestReplicationScenariosAcrossInstances {
       End of additional steps
     */
 
+    // Reset ckpt and last repl ID keys to empty set for allowing bootstrap 
load
     replica.run("show databases")
         .verifyFailure(new String[] { primaryDbName, dbOne, dbTwo })
+        .run("alter database default set dbproperties 
('hive.repl.ckpt.key'='', 'repl.last.id'='')")
         .load("", bootstrapTuple.dumpLocation)
         .run("show databases")
         .verifyResults(new String[] { "default", primaryDbName, dbOne })
@@ -772,6 +782,23 @@ public class TestReplicationScenariosAcrossInstances {
     assertFalse(fs.exists(cSerdesTableDumpLocation));
   }
 
+  private void verifyIfCkptSet(WarehouseInstance wh, String dbName, String 
dumpDir) throws Exception {
+    Database db = wh.getDatabase(replicatedDbName);
+    verifyIfCkptSet(db.getParameters(), dumpDir);
+
+    List<String> tblNames = wh.getAllTables(dbName);
+    for (String tblName : tblNames) {
+      Table tbl = wh.getTable(dbName, tblName);
+      verifyIfCkptSet(tbl.getParameters(), dumpDir);
+      if (tbl.getPartitionKeysSize() != 0) {
+        List<Partition> partitions = wh.getAllPartitions(dbName, tblName);
+        for (Partition ptn : partitions) {
+          verifyIfCkptSet(ptn.getParameters(), dumpDir);
+        }
+      }
+    }
+  }
+
   private void verifyIfCkptSet(Map<String, String> props, String dumpDir) {
     assertTrue(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY));
     assertTrue(props.get(ReplUtils.REPL_CHECKPOINT_KEY).equals(dumpDir));
@@ -786,41 +813,6 @@ public class TestReplicationScenariosAcrossInstances {
   }
 
   @Test
-  public void testIfCkptSetForObjectsByBootstrapReplLoad() throws Throwable {
-    WarehouseInstance.Tuple tuple = primary
-            .run("use " + primaryDbName)
-            .run("create table t1 (id int)")
-            .run("insert into table t1 values (10)")
-            .run("create table t2 (place string) partitioned by (country 
string)")
-            .run("insert into table t2 partition(country='india') values 
('bangalore')")
-            .run("insert into table t2 partition(country='uk') values 
('london')")
-            .run("insert into table t2 partition(country='us') values ('sfo')")
-            .dump(primaryDbName, null);
-
-    replica.load(replicatedDbName, tuple.dumpLocation)
-            .run("use " + replicatedDbName)
-            .run("repl status " + replicatedDbName)
-            .verifyResult(tuple.lastReplicationId)
-            .run("show tables")
-            .verifyResults(new String[] { "t1", "t2" })
-            .run("select country from t2")
-            .verifyResults(Arrays.asList("india", "uk", "us"));
-
-    Database db = replica.getDatabase(replicatedDbName);
-    verifyIfCkptSet(db.getParameters(), tuple.dumpLocation);
-    Table t1 = replica.getTable(replicatedDbName, "t1");
-    verifyIfCkptSet(t1.getParameters(), tuple.dumpLocation);
-    Table t2 = replica.getTable(replicatedDbName, "t2");
-    verifyIfCkptSet(t2.getParameters(), tuple.dumpLocation);
-    Partition india = replica.getPartition(replicatedDbName, "t2", 
Collections.singletonList("india"));
-    verifyIfCkptSet(india.getParameters(), tuple.dumpLocation);
-    Partition us = replica.getPartition(replicatedDbName, "t2", 
Collections.singletonList("us"));
-    verifyIfCkptSet(us.getParameters(), tuple.dumpLocation);
-    Partition uk = replica.getPartition(replicatedDbName, "t2", 
Collections.singletonList("uk"));
-    verifyIfCkptSet(uk.getParameters(), tuple.dumpLocation);
-  }
-
-  @Test
   public void testIfCkptAndSourceOfReplPropsIgnoredByReplDump() throws 
Throwable {
     WarehouseInstance.Tuple tuplePrimary = primary
             .run("use " + primaryDbName)
@@ -944,4 +936,272 @@ public class TestReplicationScenariosAcrossInstances {
 
     replica.run("drop database if exists " + importDbFromReplica + " cascade");
   }
+
+  @Test
+  public void testIfBootstrapReplLoadFailWhenRetryAfterBootstrapComplete() 
throws Throwable {
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create table t1 (id int)")
+            .run("insert into table t1 values (10)")
+            .run("create table t2 (place string) partitioned by (country 
string)")
+            .run("insert into table t2 partition(country='india') values 
('bangalore')")
+            .run("insert into table t2 partition(country='uk') values 
('london')")
+            .run("insert into table t2 partition(country='us') values ('sfo')")
+            .dump(primaryDbName, null);
+
+    replica.load(replicatedDbName, tuple.dumpLocation)
+            .run("use " + replicatedDbName)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("show tables")
+            .verifyResults(new String[] { "t1", "t2" })
+            .run("select id from t1")
+            .verifyResults(Arrays.asList("10"))
+            .run("select country from t2 order by country")
+            .verifyResults(Arrays.asList("india", "uk", "us"));
+    verifyIfCkptSet(replica, replicatedDbName, tuple.dumpLocation);
+
+    WarehouseInstance.Tuple tuple_2 = primary
+            .run("use " + primaryDbName)
+            .dump(primaryDbName, null);
+
+    // Retry with different dump should fail.
+    replica.loadFailure(replicatedDbName, tuple_2.dumpLocation);
+
+    // Retry with same dump with which it was already loaded also fails.
+    replica.loadFailure(replicatedDbName, tuple.dumpLocation);
+
+    // Retry from same dump when the database is empty is also not allowed.
+    replica.run("drop table t1")
+            .run("drop table t2")
+            .loadFailure(replicatedDbName, tuple.dumpLocation);
+  }
+
+  @Test
+  public void testBootstrapReplLoadRetryAfterFailureForTablesAndConstraints() 
throws Throwable {
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create table t1(a string, b string, primary key (a, b) 
disable novalidate rely)")
+            .run("create table t2(a string, b string, foreign key (a, b) 
references t1(a, b) disable novalidate)")
+            .run("create table t3(a string, b string not null disable, unique 
(a) disable)")
+            .dump(primaryDbName, null);
+
+    WarehouseInstance.Tuple tuple2 = primary
+            .run("use " + primaryDbName)
+            .dump(primaryDbName, null);
+
+    // Need to drop the primary DB as metastore is shared by both 
primary/replica. So, constraints
+    // conflict when loaded. Some issue with framework which needs to be 
relook into later.
+    primary.run("drop database if exists " + primaryDbName + " cascade");
+
+    // Allow create table only on t1. Create should fail for rest of the 
tables and hence constraints
+    // also not loaded.
+    BehaviourInjection<CallerArguments, Boolean> callerVerifier
+            = new BehaviourInjection<CallerArguments, Boolean>() {
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable CallerArguments args) {
+        injectionPathCalled = true;
+        if (!args.dbName.equalsIgnoreCase(replicatedDbName) || 
(args.constraintTblName != null)) {
+          LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)
+                  + " Constraint Table: " + 
String.valueOf(args.constraintTblName));
+          return false;
+        }
+        if (args.tblName != null) {
+          LOG.warn("Verifier - Table: " + String.valueOf(args.tblName));
+          return args.tblName.equals("t1");
+        }
+        return true;
+      }
+    };
+    InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
+
+    // Trigger bootstrap dump which just creates table t1 and other tables 
(t2, t3) and constraints not loaded.
+    List<String> withConfigs = 
Arrays.asList("'hive.repl.approx.max.load.tasks'='1'");
+    replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+    InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the 
behaviour
+    callerVerifier.assertInjectionsPerformed(true, false);
+
+    replica.run("use " + replicatedDbName)
+            .run("repl status " + replicatedDbName)
+            .verifyResult("null")
+            .run("show tables")
+            .verifyResults(new String[] { "t1" });
+    assertEquals(0, replica.getPrimaryKeyList(replicatedDbName, "t1").size());
+    assertEquals(0, replica.getUniqueConstraintList(replicatedDbName, 
"t3").size());
+    assertEquals(0, replica.getNotNullConstraintList(replicatedDbName, 
"t3").size());
+    assertEquals(0, replica.getForeignKeyList(replicatedDbName, "t2").size());
+
+    // Retry with different dump should fail.
+    replica.loadFailure(replicatedDbName, tuple2.dumpLocation);
+
+    // Verify if create table is not called on table t1 but called for t2 and 
t3.
+    // Also, allow constraint creation only on t1 and t3. Foreign key creation 
on t2 fails.
+    callerVerifier = new BehaviourInjection<CallerArguments, Boolean>() {
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable CallerArguments args) {
+        injectionPathCalled = true;
+        if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.funcName 
!= null)) {
+          LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + " Func: " 
+ String.valueOf(args.funcName));
+          return false;
+        }
+        if (args.tblName != null) {
+          LOG.warn("Verifier - Table: " + String.valueOf(args.tblName));
+          return (args.tblName.equals("t2") || args.tblName.equals("t3"));
+        }
+        if (args.constraintTblName != null) {
+          LOG.warn("Verifier - Constraint Table: " + 
String.valueOf(args.constraintTblName));
+          return (args.constraintTblName.equals("t1") || 
args.constraintTblName.equals("t3"));
+        }
+        return true;
+      }
+    };
+    InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
+
+    // Retry with same dump with which it was already loaded should resume the 
bootstrap load.
+    // This time, it fails when try to load the foreign key constraints. All 
other constraints are loaded.
+    replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+    InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the 
behaviour
+    callerVerifier.assertInjectionsPerformed(true, false);
+
+    replica.run("use " + replicatedDbName)
+            .run("repl status " + replicatedDbName)
+            .verifyResult("null")
+            .run("show tables")
+            .verifyResults(new String[] { "t1", "t2", "t3" });
+    assertEquals(2, replica.getPrimaryKeyList(replicatedDbName, "t1").size());
+    assertEquals(1, replica.getUniqueConstraintList(replicatedDbName, 
"t3").size());
+    assertEquals(1, replica.getNotNullConstraintList(replicatedDbName, 
"t3").size());
+    assertEquals(0, replica.getForeignKeyList(replicatedDbName, "t2").size());
+
+    // Verify if no create table/function calls. Only add foreign key 
constraints on table t2.
+    callerVerifier = new BehaviourInjection<CallerArguments, Boolean>() {
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable CallerArguments args) {
+        injectionPathCalled = true;
+        if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.tblName 
!= null)) {
+          LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)
+                  + " Table: " + String.valueOf(args.tblName));
+          return false;
+        }
+        if (args.constraintTblName != null) {
+          LOG.warn("Verifier - Constraint Table: " + 
String.valueOf(args.constraintTblName));
+          return args.constraintTblName.equals("t2");
+        }
+        return true;
+      }
+    };
+    InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
+
+    // Retry with same dump with which it was already loaded should resume the 
bootstrap load.
+    // This time, it completes by adding just foreign key constraints for 
table t2.
+    replica.load(replicatedDbName, tuple.dumpLocation);
+    InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the 
behaviour
+    callerVerifier.assertInjectionsPerformed(true, false);
+
+    replica.run("use " + replicatedDbName)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("show tables")
+            .verifyResults(new String[] { "t1", "t2", "t3" });
+    assertEquals(2, replica.getPrimaryKeyList(replicatedDbName, "t1").size());
+    assertEquals(1, replica.getUniqueConstraintList(replicatedDbName, 
"t3").size());
+    assertEquals(1, replica.getNotNullConstraintList(replicatedDbName, 
"t3").size());
+    assertEquals(2, replica.getForeignKeyList(replicatedDbName, "t2").size());
+  }
+
+  @Test
+  public void testBootstrapReplLoadRetryAfterFailureForPartitions() throws 
Throwable {
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create table t1 (id int)")
+            .run("insert into table t1 values (10)")
+            .run("create table t2 (place string) partitioned by (country 
string)")
+            .run("insert into table t2 partition(country='india') values 
('bangalore')")
+            .run("insert into table t2 partition(country='uk') values 
('london')")
+            .run("insert into table t2 partition(country='us') values ('sfo')")
+            .run("CREATE FUNCTION " + primaryDbName
+                    + ".testFunctionOne as 'hivemall.tools.string.StopwordUDF' 
"
+                    + "using jar  'ivy://io.github.myui:hivemall:0.4.0-2'")
+            .dump(primaryDbName, null);
+
+    WarehouseInstance.Tuple tuple2 = primary
+            .run("use " + primaryDbName)
+            .dump(primaryDbName, null);
+
+    // Inject a behavior where REPL LOAD failed when try to load table "t2" 
and partition "uk".
+    // So, table "t1" and "t2" will exist and partition "india" will exist, 
rest failed as operation failed.
+    BehaviourInjection<Partition, Partition> getPartitionStub
+            = new BehaviourInjection<Partition, Partition>() {
+      @Nullable
+      @Override
+      public Partition apply(@Nullable Partition ptn) {
+        if (ptn.getValues().get(0).equals("india")) {
+          injectionPathCalled = true;
+          LOG.warn("####getPartition Stub called");
+          return null;
+        }
+        return ptn;
+      }
+    };
+    InjectableBehaviourObjectStore.setGetPartitionBehaviour(getPartitionStub);
+
+    List<String> withConfigs = 
Arrays.asList("'hive.repl.approx.max.load.tasks'='1'");
+    replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+    InjectableBehaviourObjectStore.resetGetPartitionBehaviour(); // reset the 
behaviour
+    getPartitionStub.assertInjectionsPerformed(true, false);
+
+    replica.run("use " + replicatedDbName)
+            .run("repl status " + replicatedDbName)
+            .verifyResult("null")
+            .run("show tables")
+            .verifyResults(new String[] { "t1", "t2" })
+            .run("select id from t1")
+            .verifyResults(Arrays.asList("10"))
+            .run("select country from t2 order by country")
+            .verifyResults(Arrays.asList("india"))
+            .run("show functions like '" + replicatedDbName + "*'")
+            .verifyResult(replicatedDbName + ".testFunctionOne");
+
+    // Retry with different dump should fail.
+    replica.loadFailure(replicatedDbName, tuple2.dumpLocation);
+
+    // Verify if no create table/function calls. Only add partitions.
+    BehaviourInjection<CallerArguments, Boolean> callerVerifier
+            = new BehaviourInjection<CallerArguments, Boolean>() {
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable CallerArguments args) {
+        if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.tblName 
!= null) || (args.funcName != null)) {
+          injectionPathCalled = true;
+          LOG.warn("Verifier - DB: " + String.valueOf(args.dbName)
+                  + " Table: " + String.valueOf(args.tblName)
+                  + " Func: " + String.valueOf(args.funcName));
+          return false;
+        }
+        return true;
+      }
+    };
+    InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
+
+    // Retry with same dump with which it was already loaded should resume the 
bootstrap load.
+    // This time, it completes by adding remaining partitions.
+    replica.load(replicatedDbName, tuple.dumpLocation);
+    InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the 
behaviour
+    callerVerifier.assertInjectionsPerformed(false, false);
+
+    replica.run("use " + replicatedDbName)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("show tables")
+            .verifyResults(new String[] { "t1", "t2" })
+            .run("select id from t1")
+            .verifyResults(Arrays.asList("10"))
+            .run("select country from t2 order by country")
+            .verifyResults(Arrays.asList("india", "uk", "us"))
+            .run("show functions like '" + replicatedDbName + "*'")
+            .verifyResult(replicatedDbName + ".testFunctionOne");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f20311b0/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 79f145c..fc812ad 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -30,9 +30,19 @@ import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.DriverFactory;
 import org.apache.hadoop.hive.ql.IDriver;
@@ -130,6 +140,8 @@ public class WarehouseInstance implements Closeable {
     if 
(!hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER).equals("org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"))
 {
       hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, 
"false");
     }
+    hiveConf.set(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL.varname,
+            "org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore");
     System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
     System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
 
@@ -257,12 +269,19 @@ public class WarehouseInstance implements Closeable {
   }
 
   WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation) 
throws Throwable {
-    runFailure("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + 
dumpLocation + "'");
-    printOutput();
     runFailure("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + 
"'");
     return this;
   }
 
+  WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation, 
List<String> withClauseOptions)
+          throws Throwable {
+    String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + 
dumpLocation + "'";
+    if (!withClauseOptions.isEmpty()) {
+      replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + 
")";
+    }
+    return runFailure(replLoadCmd);
+  }
+
   WarehouseInstance verifyResult(String data) throws IOException {
     verifyResults(data == null ? new String[] {} : new String[] { data });
     return this;
@@ -331,15 +350,56 @@ public class WarehouseInstance implements Closeable {
   }
 
   public Database getDatabase(String dbName) throws Exception {
-    return client.getDatabase(dbName);
+    try {
+      return client.getDatabase(dbName);
+    } catch (NoSuchObjectException e) {
+      return null;
+    }
+  }
+
+  public List<String> getAllTables(String dbName) throws Exception {
+    return client.getAllTables(dbName);
   }
 
   public Table getTable(String dbName, String tableName) throws Exception {
-    return client.getTable(dbName, tableName);
+    try {
+      return client.getTable(dbName, tableName);
+    } catch (NoSuchObjectException e) {
+      return null;
+    }
+  }
+
+  public List<Partition> getAllPartitions(String dbName, String tableName) 
throws Exception {
+    try {
+      return client.listPartitions(dbName, tableName, Short.MAX_VALUE);
+    } catch (NoSuchObjectException e) {
+      return null;
+    }
   }
 
   public Partition getPartition(String dbName, String tableName, List<String> 
partValues) throws Exception {
-    return client.getPartition(dbName, tableName, partValues);
+    try {
+      return client.getPartition(dbName, tableName, partValues);
+    } catch (NoSuchObjectException e) {
+      return null;
+    }
+  }
+
+  public List<SQLPrimaryKey> getPrimaryKeyList(String dbName, String tblName) 
throws Exception {
+    return client.getPrimaryKeys(new PrimaryKeysRequest(dbName, tblName));
+  }
+
+  public List<SQLForeignKey> getForeignKeyList(String dbName, String tblName) 
throws Exception {
+    return client.getForeignKeys(new ForeignKeysRequest(null, null, dbName, 
tblName));
+  }
+
+  public List<SQLUniqueConstraint> getUniqueConstraintList(String dbName, 
String tblName) throws Exception {
+    return client.getUniqueConstraints(new 
UniqueConstraintsRequest(Warehouse.DEFAULT_CATALOG_NAME, dbName, tblName));
+  }
+
+  public List<SQLNotNullConstraint> getNotNullConstraintList(String dbName, 
String tblName) throws Exception {
+    return client.getNotNullConstraints(
+            new NotNullConstraintsRequest(Warehouse.DEFAULT_CATALOG_NAME, 
dbName, tblName));
   }
 
   ReplicationV1CompatRule getReplivationV1CompatRule(List<String> testsToSkip) 
{

http://git-wip-us.apache.org/repos/asf/hive/blob/f20311b0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
index cbec3ad..18a8304 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
@@ -18,23 +18,74 @@
 package org.apache.hadoop.hive.ql.exec.repl;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 
 
 public class ReplUtils {
 
   public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key";
 
+  /**
+   * Bootstrap REPL LOAD operation type on the examined object based on ckpt 
state.
+   */
+  public enum ReplLoadOpType {
+    LOAD_NEW, LOAD_SKIP, LOAD_REPLACE
+  }
+
+  public static Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs(
+          Table table, List<Map<String, String>> partitions) throws 
SemanticException {
+    Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = new HashMap<>();
+    int partPrefixLength = 0;
+    if (partitions.size() > 0) {
+      partPrefixLength = partitions.get(0).size();
+      // pick the length of the first ptn, we expect all ptns listed to have 
the same number of
+      // key-vals.
+    }
+    List<ExprNodeGenericFuncDesc> partitionDesc = new ArrayList<>();
+    for (Map<String, String> ptn : partitions) {
+      // convert each key-value-map to appropriate expression.
+      ExprNodeGenericFuncDesc expr = null;
+      for (Map.Entry<String, String> kvp : ptn.entrySet()) {
+        String key = kvp.getKey();
+        Object val = kvp.getValue();
+        String type = table.getPartColByName(key).getType();
+        PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type);
+        ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, 
true);
+        ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate(
+                "=", column, new 
ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, val));
+        expr = (expr == null) ? op : 
DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op);
+      }
+      if (expr != null) {
+        partitionDesc.add(expr);
+      }
+    }
+    if (partitionDesc.size() > 0) {
+      partSpecs.put(partPrefixLength, partitionDesc);
+    }
+    return partSpecs;
+  }
+
   public static Task<?> getTableReplLogTask(ImportTableDesc tableDesc, 
ReplLogger replLogger, HiveConf conf)
           throws SemanticException {
     ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, 
tableDesc.getTableName(), tableDesc.tableType());
@@ -55,4 +106,18 @@ public class ReplUtils {
     }
     return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), 
alterTblDesc), conf);
   }
+
+  public static boolean replCkptStatus(String dbName, Map<String, String> 
props, String dumpRoot)
+          throws InvalidOperationException {
+    // If ckpt property not set or empty means, bootstrap is not run on this 
object.
+    if ((props != null) && props.containsKey(REPL_CHECKPOINT_KEY) && 
!props.get(REPL_CHECKPOINT_KEY).isEmpty()) {
+      if (props.get(REPL_CHECKPOINT_KEY).equals(dumpRoot)) {
+        return true;
+      }
+      throw new InvalidOperationException("REPL LOAD with Dump: " + dumpRoot
+              + " is not allowed as the target DB: " + dbName
+              + " is already bootstrap loaded by another Dump " + 
props.get(REPL_CHECKPOINT_KEY));
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f20311b0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
index 97917f8..76fb2a3 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
@@ -112,8 +112,10 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
             loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, 
scope));
           }
           work.updateDbEventState(dbEvent.toState());
-          scope.database = true;
-          scope.rootTasks.addAll(dbTracker.tasks());
+          if (dbTracker.hasTasks()) {
+            scope.rootTasks.addAll(dbTracker.tasks());
+            scope.database = true;
+          }
           dbTracker.debugLog("database");
           break;
         case Table: {
@@ -129,11 +131,11 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
           LoadTable loadTable = new LoadTable(tableEvent, context, 
iterator.replLogger(),
                                               tableContext, loadTaskTracker);
           tableTracker = loadTable.tasks();
-          if (!scope.database) {
+          setUpDependencies(dbTracker, tableTracker);
+          if (!scope.database && tableTracker.hasTasks()) {
             scope.rootTasks.addAll(tableTracker.tasks());
             scope.table = true;
           }
-          setUpDependencies(dbTracker, tableTracker);
           /*
             for table replication if we reach the max number of tasks then for 
the next run we will
             try to reload the same table again, this is mainly for ease of 
understanding the code
@@ -285,9 +287,15 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
       This sets up dependencies such that a child task is dependant on the 
parent to be complete.
    */
   private void setUpDependencies(TaskTracker parentTasks, TaskTracker 
childTasks) {
-    for (Task<? extends Serializable> parentTask : parentTasks.tasks()) {
+    if (parentTasks.hasTasks()) {
+      for (Task<? extends Serializable> parentTask : parentTasks.tasks()) {
+        for (Task<? extends Serializable> childTask : childTasks.tasks()) {
+          parentTask.addDependentTask(childTask);
+        }
+      }
+    } else {
       for (Task<? extends Serializable> childTask : childTasks.tasks()) {
-        parentTask.addDependentTask(childTask);
+        parentTasks.addTask(childTask);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/f20311b0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
index 0fabf5a..d203ae4 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -54,7 +54,7 @@ public class FSTableEvent implements TableEvent {
   }
 
   public boolean shouldNotReplicate() {
-    ReplicationSpec spec = metadata.getReplicationSpec();
+    ReplicationSpec spec = replicationSpec();
     return spec.isNoop() || !spec.isInReplicationScope();
   }
 
@@ -69,7 +69,7 @@ public class FSTableEvent implements TableEvent {
       Table table = new Table(metadata.getTable());
       ImportTableDesc tableDesc =
           new ImportTableDesc(StringUtils.isBlank(dbName) ? table.getDbName() 
: dbName, table);
-      tableDesc.setReplicationSpec(metadata.getReplicationSpec());
+      tableDesc.setReplicationSpec(replicationSpec());
       return tableDesc;
     } catch (Exception e) {
       throw new SemanticException(e);
@@ -122,7 +122,7 @@ public class FSTableEvent implements TableEvent {
       partDesc.setSortCols(partition.getSd().getSortCols());
       partDesc.setLocation(new Path(fromPath,
           Warehouse.makePartName(tblDesc.getPartCols(), 
partition.getValues())).toString());
-      partsDesc.setReplicationSpec(metadata.getReplicationSpec());
+      partsDesc.setReplicationSpec(replicationSpec());
       return partsDesc;
     } catch (Exception e) {
       throw new SemanticException(e);

http://git-wip-us.apache.org/repos/asf/hive/blob/f20311b0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
index de17d70..26f4892 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
@@ -17,8 +17,20 @@
  */
 package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
+import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage;
+import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent;
@@ -50,6 +62,7 @@ public class LoadConstraint {
   private final ConstraintEvent event;
   private final String dbNameToLoadIn;
   private final TaskTracker tracker;
+  private final MessageDeserializer deserializer = 
MessageFactory.getInstance().getDeserializer();
 
   public LoadConstraint(Context context, ConstraintEvent event, String 
dbNameToLoadIn,
       TaskTracker existingTracker) {
@@ -73,7 +86,7 @@ public class LoadConstraint {
       String nnsString = json.getString("nns");
       List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends 
Serializable>>();
 
-      if (pksString != null && !pksString.isEmpty()) {
+      if (pksString != null && !pksString.isEmpty() && 
!isPrimaryKeysAlreadyLoaded(pksString)) {
         AddPrimaryKeyHandler pkHandler = new AddPrimaryKeyHandler();
         DumpMetaData pkDumpMetaData = new DumpMetaData(fromPath, 
DumpType.EVENT_ADD_PRIMARYKEY, Long.MAX_VALUE, Long.MAX_VALUE, null,
             context.hiveConf);
@@ -84,7 +97,7 @@ public class LoadConstraint {
                 context.hiveDb, context.nestedContext, LOG)));
       }
 
-      if (uksString != null && !uksString.isEmpty()) {
+      if (uksString != null && !uksString.isEmpty() && 
!isUniqueConstraintsAlreadyLoaded(uksString)) {
         AddUniqueConstraintHandler ukHandler = new 
AddUniqueConstraintHandler();
         DumpMetaData ukDumpMetaData = new DumpMetaData(fromPath, 
DumpType.EVENT_ADD_UNIQUECONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null,
             context.hiveConf);
@@ -95,7 +108,7 @@ public class LoadConstraint {
                 context.hiveDb, context.nestedContext, LOG)));
       }
 
-      if (nnsString != null && !nnsString.isEmpty()) {
+      if (nnsString != null && !nnsString.isEmpty() && 
!isNotNullConstraintsAlreadyLoaded(nnsString)) {
         AddNotNullConstraintHandler nnHandler = new 
AddNotNullConstraintHandler();
         DumpMetaData nnDumpMetaData = new DumpMetaData(fromPath, 
DumpType.EVENT_ADD_NOTNULLCONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null,
             context.hiveConf);
@@ -106,7 +119,7 @@ public class LoadConstraint {
                 context.hiveDb, context.nestedContext, LOG)));
       }
 
-      if (fksString != null && !fksString.isEmpty()) {
+      if (fksString != null && !fksString.isEmpty() && 
!isForeignKeysAlreadyLoaded(fksString)) {
         AddForeignKeyHandler fkHandler = new AddForeignKeyHandler();
         DumpMetaData fkDumpMetaData = new DumpMetaData(fromPath, 
DumpType.EVENT_ADD_FOREIGNKEY, Long.MAX_VALUE, Long.MAX_VALUE, null,
             context.hiveConf);
@@ -124,4 +137,72 @@ public class LoadConstraint {
     }
   }
 
+  private boolean isPrimaryKeysAlreadyLoaded(String pksMsgString) throws 
Exception {
+    AddPrimaryKeyMessage msg = 
deserializer.getAddPrimaryKeyMessage(pksMsgString);
+    List<SQLPrimaryKey> pksInMsg = msg.getPrimaryKeys();
+    if (pksInMsg.isEmpty()) {
+      return true;
+    }
+
+    String dbName = StringUtils.isBlank(dbNameToLoadIn) ? 
pksInMsg.get(0).getTable_db() : dbNameToLoadIn;
+    List<SQLPrimaryKey> pks;
+    try {
+      pks = context.hiveDb.getPrimaryKeyList(dbName, 
pksInMsg.get(0).getTable_name());
+    } catch (NoSuchObjectException e) {
+      return false;
+    }
+    return ((pks != null) && !pks.isEmpty());
+  }
+
+  private boolean isForeignKeysAlreadyLoaded(String fksMsgString) throws 
Exception {
+    AddForeignKeyMessage msg = 
deserializer.getAddForeignKeyMessage(fksMsgString);
+    List<SQLForeignKey> fksInMsg = msg.getForeignKeys();
+    if (fksInMsg.isEmpty()) {
+      return true;
+    }
+
+    String dbName = StringUtils.isBlank(dbNameToLoadIn) ? 
fksInMsg.get(0).getFktable_db() : dbNameToLoadIn;
+    List<SQLForeignKey> fks;
+    try {
+      fks = context.hiveDb.getForeignKeyList(dbName, 
fksInMsg.get(0).getFktable_name());
+    } catch (NoSuchObjectException e) {
+      return false;
+    }
+    return ((fks != null) && !fks.isEmpty());
+  }
+
+  private boolean isUniqueConstraintsAlreadyLoaded(String uksMsgString) throws 
Exception {
+    AddUniqueConstraintMessage msg = 
deserializer.getAddUniqueConstraintMessage(uksMsgString);
+    List<SQLUniqueConstraint> uksInMsg = msg.getUniqueConstraints();
+    if (uksInMsg.isEmpty()) {
+      return true;
+    }
+
+    String dbName = StringUtils.isBlank(dbNameToLoadIn) ? 
uksInMsg.get(0).getTable_db() : dbNameToLoadIn;
+    List<SQLUniqueConstraint> uks;
+    try {
+      uks = context.hiveDb.getUniqueConstraintList(dbName, 
uksInMsg.get(0).getTable_name());
+    } catch (NoSuchObjectException e) {
+      return false;
+    }
+    return ((uks != null) && !uks.isEmpty());
+  }
+
+  private boolean isNotNullConstraintsAlreadyLoaded(String nnsMsgString) 
throws Exception {
+    AddNotNullConstraintMessage msg = 
deserializer.getAddNotNullConstraintMessage(nnsMsgString);
+    List<SQLNotNullConstraint> nnsInMsg = msg.getNotNullConstraints();
+    if (nnsInMsg.isEmpty()) {
+      return true;
+    }
+
+    String dbName = StringUtils.isBlank(dbNameToLoadIn) ? 
nnsInMsg.get(0).getTable_db() : dbNameToLoadIn;
+    List<SQLNotNullConstraint> nns;
+    try {
+      nns = context.hiveDb.getNotNullConstraintList(dbName, 
nnsInMsg.get(0).getTable_name());
+    } catch (NoSuchObjectException e) {
+      return false;
+    }
+    return ((nns != null) && !nns.isEmpty());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f20311b0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
index c5f2779..0270d2a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.PrincipalDesc;
 import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -58,11 +59,23 @@ public class LoadDatabase {
   public TaskTracker tasks() throws SemanticException {
     try {
       Database dbInMetadata = readDbMetadata();
-      Task<? extends Serializable> dbRootTask = 
existEmptyDb(dbInMetadata.getName())
-          ? alterDbTask(dbInMetadata, context.hiveConf)
-          : createDbTask(dbInMetadata);
-      dbRootTask.addDependentTask(setOwnerInfoTask(dbInMetadata));
-      tracker.addTask(dbRootTask);
+      String dbName = dbInMetadata.getName();
+      Task<? extends Serializable> dbRootTask = null;
+      ReplLoadOpType loadDbType = getLoadDbType(dbName);
+      switch (loadDbType) {
+        case LOAD_NEW:
+          dbRootTask = createDbTask(dbInMetadata);
+          break;
+        case LOAD_REPLACE:
+          dbRootTask = alterDbTask(dbInMetadata);
+          break;
+        default:
+          break;
+      }
+      if (dbRootTask != null) {
+        dbRootTask.addDependentTask(setOwnerInfoTask(dbInMetadata));
+        tracker.addTask(dbRootTask);
+      }
       return tracker;
     } catch (Exception e) {
       throw new SemanticException(e);
@@ -73,24 +86,44 @@ public class LoadDatabase {
     return event.dbInMetadata(dbNameToLoadIn);
   }
 
+  private ReplLoadOpType getLoadDbType(String dbName) throws 
InvalidOperationException, HiveException {
+    Database db = context.hiveDb.getDatabase(dbName);
+    if (db == null) {
+      return ReplLoadOpType.LOAD_NEW;
+    }
+    if (isDbAlreadyBootstrapped(db)) {
+      throw new InvalidOperationException("Bootstrap REPL LOAD is not allowed 
on Database: " + dbName
+              + " as it was already done.");
+    }
+    if (ReplUtils.replCkptStatus(dbName, db.getParameters(), 
context.dumpDirectory)) {
+      return ReplLoadOpType.LOAD_SKIP;
+    }
+    if (isDbEmpty(dbName)) {
+      return ReplLoadOpType.LOAD_REPLACE;
+    }
+    throw new InvalidOperationException("Bootstrap REPL LOAD is not allowed on 
Database: " + dbName
+                    + " as it is not empty. One or more tables/functions 
exist.");
+  }
+
+  private boolean isDbAlreadyBootstrapped(Database db) {
+    Map<String, String> props = db.getParameters();
+    return ((props != null)
+            && props.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString())
+            && 
!props.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()).isEmpty());
+  }
+
+  private boolean isDbEmpty(String dbName) throws HiveException {
+    List<String> allTables = context.hiveDb.getAllTables(dbName);
+    List<String> allFunctions = context.hiveDb.getFunctions(dbName, "*");
+    return allTables.isEmpty() && allFunctions.isEmpty();
+  }
+
   private Task<? extends Serializable> createDbTask(Database dbObj) {
     CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc();
     createDbDesc.setName(dbObj.getName());
     createDbDesc.setComment(dbObj.getDescription());
+    createDbDesc.setDatabaseProperties(updateDbProps(dbObj, 
context.dumpDirectory));
 
-    /*
-    explicitly remove the setting of last.repl.id from the db object 
parameters as loadTask is going
-    to run multiple times and explicit logic is in place which prevents 
updates to tables when db level
-    last repl id is set and we create a AlterDatabaseTask at the end of 
processing a database.
-     */
-    Map<String, String> parameters = new HashMap<>(dbObj.getParameters());
-    parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString());
-
-    // Add the checkpoint key to the Database binding it to current dump 
directory.
-    // So, if retry using same dump, we shall skip Database object update.
-    parameters.put(ReplUtils.REPL_CHECKPOINT_KEY, context.dumpDirectory);
-
-    createDbDesc.setDatabaseProperties(parameters);
     // note that we do not set location - for repl load, we want that 
auto-created.
     createDbDesc.setIfNotExists(false);
     // If it exists, we want this to be an error condition. Repl Load is not 
intended to replace a
@@ -100,11 +133,8 @@ public class LoadDatabase {
     return TaskFactory.get(work, context.hiveConf);
   }
 
-  private static Task<? extends Serializable> alterDbTask(Database dbObj, 
HiveConf hiveConf) {
-    AlterDatabaseDesc alterDbDesc =
-        new AlterDatabaseDesc(dbObj.getName(), dbObj.getParameters(), null);
-    DDLWork work = new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc);
-    return TaskFactory.get(work, hiveConf);
+  private Task<? extends Serializable> alterDbTask(Database dbObj) {
+    return alterDbTask(dbObj.getName(), updateDbProps(dbObj, 
context.dumpDirectory), context.hiveConf);
   }
 
   private Task<? extends Serializable> setOwnerInfoTask(Database dbObj) {
@@ -115,18 +145,27 @@ public class LoadDatabase {
     return TaskFactory.get(work, context.hiveConf);
   }
 
-  private boolean existEmptyDb(String dbName) throws 
InvalidOperationException, HiveException {
-    Database db = context.hiveDb.getDatabase(dbName);
-    if (db == null) {
-      return false;
-    }
-    List<String> allTables = context.hiveDb.getAllTables(dbName);
-    List<String> allFunctions = context.hiveDb.getFunctions(dbName, "*");
-    if (allTables.isEmpty() && allFunctions.isEmpty()) {
-      return true;
-    }
-    throw new InvalidOperationException(
-        "Database " + db.getName() + " is not empty. One or more 
tables/functions exist.");
+  private static Map<String, String> updateDbProps(Database dbObj, String 
dumpDirectory) {
+    /*
+    explicitly remove the setting of last.repl.id from the db object 
parameters as loadTask is going
+    to run multiple times and explicit logic is in place which prevents 
updates to tables when db level
+    last repl id is set and we create a AlterDatabaseTask at the end of 
processing a database.
+     */
+    Map<String, String> parameters = new HashMap<>(dbObj.getParameters());
+    parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+
+    // Add the checkpoint key to the Database binding it to current dump 
directory.
+    // So, if retry using same dump, we shall skip Database object update.
+    parameters.put(ReplUtils.REPL_CHECKPOINT_KEY, dumpDirectory);
+    return parameters;
+  }
+
+  private static Task<? extends Serializable> alterDbTask(String dbName, 
Map<String, String> props,
+                                                          HiveConf hiveConf) {
+    AlterDatabaseDesc alterDbDesc =
+            new AlterDatabaseDesc(dbName, props, null);
+    DDLWork work = new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc);
+    return TaskFactory.get(work, hiveConf);
   }
 
   public static class AlterDatabase extends LoadDatabase {
@@ -138,7 +177,8 @@ public class LoadDatabase {
 
     @Override
     public TaskTracker tasks() throws SemanticException {
-      tracker.addTask(alterDbTask(readDbMetadata(), context.hiveConf));
+      Database dbObj = readDbMetadata();
+      tracker.addTask(alterDbTask(dbObj.getName(), dbObj.getParameters(), 
context.hiveConf));
       return tracker;
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/f20311b0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
index c100344..b886ff4 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
@@ -17,7 +17,11 @@
  */
 package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -26,9 +30,11 @@ import 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
 import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
 import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler;
 import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
 import org.slf4j.Logger;
@@ -71,6 +77,9 @@ public class LoadFunction {
     Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), 
fromURI.getPath());
 
     try {
+      if (isFunctionAlreadyLoaded(fromPath)) {
+        return tracker;
+      }
       CreateFunctionHandler handler = new CreateFunctionHandler();
       List<Task<? extends Serializable>> tasks = handler.handle(
           new MessageHandler.Context(
@@ -85,4 +94,21 @@ public class LoadFunction {
     }
   }
 
+  private boolean isFunctionAlreadyLoaded(Path funcDumpRoot) throws 
HiveException, IOException {
+    Path metadataPath = new Path(funcDumpRoot, EximUtil.METADATA_NAME);
+    FileSystem fs = FileSystem.get(metadataPath.toUri(), context.hiveConf);
+    MetaData metadata = EximUtil.readMetaData(fs, metadataPath);
+    Function function;
+    try {
+      String dbName = StringUtils.isBlank(dbNameToLoadIn) ? 
metadata.function.getDbName() : dbNameToLoadIn;
+      function = context.hiveDb.getFunction(dbName, 
metadata.function.getFunctionName());
+    } catch (HiveException e) {
+      if (e.getCause() instanceof NoSuchObjectException) {
+        return false;
+      }
+      throw e;
+    }
+    return (function != null);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f20311b0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 870f70a..f64bc26 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -20,6 +20,7 @@ package 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
@@ -37,11 +39,12 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DropTableDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
 import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
@@ -167,6 +170,13 @@ public class LoadPartitions {
   }
 
   private TaskTracker forNewTable() throws Exception {
+    Database parentDb = 
context.hiveDb.getDatabase(tableDesc.getDatabaseName());
+    // If table doesn't exist, allow creating a new one only if the database 
state is older than the update.
+    // This in-turn applicable for partitions creation as well.
+    if ((parentDb != null) && 
(!event.replicationSpec().allowReplacementInto(parentDb.getParameters()))) {
+      return tracker;
+    }
+
     Iterator<AddPartitionDesc> iterator = 
event.partitionDescriptions(tableDesc).iterator();
     while (iterator.hasNext() && tracker.canAddMoreTasks()) {
       AddPartitionDesc currentPartitionDesc = iterator.next();
@@ -174,13 +184,14 @@ public class LoadPartitions {
        the currentPartitionDesc cannot be inlined as we need the hasNext() to 
be evaluated post the
        current retrieved lastReplicatedPartition
       */
-      addPartition(iterator.hasNext(), currentPartitionDesc);
+      addPartition(iterator.hasNext(), currentPartitionDesc, null);
     }
     return tracker;
   }
 
-  private void addPartition(boolean hasMorePartitions, AddPartitionDesc 
addPartitionDesc) throws Exception {
-    tracker.addTask(tasksForAddPartition(table, addPartitionDesc));
+  private void addPartition(boolean hasMorePartitions, AddPartitionDesc 
addPartitionDesc, Task<?> ptnRootTask)
+          throws Exception {
+    tracker.addTask(tasksForAddPartition(table, addPartitionDesc, 
ptnRootTask));
     if (hasMorePartitions && !tracker.canAddMoreTasks()) {
       ReplicationState currentReplicationState =
           new ReplicationState(new PartitionState(table.getTableName(), 
addPartitionDesc));
@@ -191,29 +202,36 @@ public class LoadPartitions {
   /**
    * returns the root task for adding a partition
    */
-  private Task<? extends Serializable> tasksForAddPartition(Table table,
-      AddPartitionDesc addPartitionDesc) throws MetaException, IOException, 
HiveException {
+  private Task<?> tasksForAddPartition(Table table, AddPartitionDesc 
addPartitionDesc, Task<?> ptnRootTask)
+          throws MetaException, IOException, HiveException {
+    Task<?> addPartTask = TaskFactory.get(
+            new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc),
+            context.hiveConf
+    );
+    if (event.replicationSpec().isMetadataOnly()) {
+      if (ptnRootTask == null) {
+        ptnRootTask = addPartTask;
+      } else {
+        ptnRootTask.addDependentTask(addPartTask);
+      }
+      return ptnRootTask;
+    }
+
     AddPartitionDesc.OnePartitionDesc partSpec = 
addPartitionDesc.getPartition(0);
     Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation());
     Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, 
partSpec);
     partSpec.setLocation(replicaWarehousePartitionLocation.toString());
     LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition "
-        + partSpecToString(partSpec.getPartSpec()) + " with source location: "
-        + partSpec.getLocation());
-    Path tmpPath = 
PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, 
context.pathInfo);
+            + partSpecToString(partSpec.getPartSpec()) + " with source 
location: "
+            + partSpec.getLocation());
 
+    Path tmpPath = 
PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, 
context.pathInfo);
     Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
         event.replicationSpec(),
         sourceWarehousePartitionLocation,
         tmpPath,
         context.hiveConf
     );
-
-    Task<?> addPartTask = TaskFactory.get(
-        new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc),
-        context.hiveConf
-    );
-
     Task<?> movePartitionTask = movePartitionTask(table, partSpec, tmpPath);
 
     // Set Checkpoint task as dependant to add partition tasks. So, if same 
dump is retried for
@@ -225,11 +243,16 @@ public class LoadPartitions {
             context.hiveConf
     );
 
+    if (ptnRootTask == null) {
+      ptnRootTask = copyTask;
+    } else {
+      ptnRootTask.addDependentTask(copyTask);
+    }
     copyTask.addDependentTask(addPartTask);
     addPartTask.addDependentTask(movePartitionTask);
     movePartitionTask.addDependentTask(ckptTask);
 
-    return copyTask;
+    return ptnRootTask;
   }
 
   /**
@@ -271,17 +294,18 @@ public class LoadPartitions {
     }
   }
 
-  private Task<? extends Serializable> alterSinglePartition(AddPartitionDesc 
desc,
-      ReplicationSpec replicationSpec, Partition ptn) {
-    desc.setReplaceMode(true);
-    if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) 
{
-      desc.setReplicationSpec(replicationSpec);
+  private Task<?> dropPartitionTask(Table table, Map<String, String> partSpec) 
throws SemanticException {
+    Task<DDLWork> dropPtnTask = null;
+    Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecsExpr =
+            ReplUtils.genPartSpecs(table, Collections.singletonList(partSpec));
+    if (partSpecsExpr.size() > 0) {
+      DropTableDesc dropPtnDesc = new 
DropTableDesc(table.getFullyQualifiedName(),
+              partSpecsExpr, null, true, event.replicationSpec());
+      dropPtnTask = TaskFactory.get(
+              new DDLWork(new HashSet<>(), new HashSet<>(), dropPtnDesc), 
context.hiveConf
+      );
     }
-    desc.getPartition(0).setLocation(ptn.getLocation()); // use existing 
location
-    return TaskFactory.get(
-        new DDLWork(new HashSet<>(), new HashSet<>(), desc),
-        context.hiveConf
-    );
+    return dropPtnTask;
   }
 
   private TaskTracker forExistingTable(AddPartitionDesc 
lastPartitionReplicated) throws Exception {
@@ -293,7 +317,6 @@ public class LoadPartitions {
           StringUtils.mapToString(lastReplicatedPartSpec));
     }
 
-    ReplicationSpec replicationSpec = event.replicationSpec();
     Iterator<AddPartitionDesc> partitionIterator = 
event.partitionDescriptions(tableDesc).iterator();
     while (!encounteredTheLastReplicatedPartition && 
partitionIterator.hasNext()) {
       AddPartitionDesc addPartitionDesc = partitionIterator.next();
@@ -304,33 +327,33 @@ public class LoadPartitions {
     while (partitionIterator.hasNext() && tracker.canAddMoreTasks()) {
       AddPartitionDesc addPartitionDesc = partitionIterator.next();
       Map<String, String> partSpec = 
addPartitionDesc.getPartition(0).getPartSpec();
-      Partition ptn = context.hiveDb.getPartition(table, partSpec, false);
-      if (ptn == null) {
-        if (!replicationSpec.isMetadataOnly()) {
-          addPartition(partitionIterator.hasNext(), addPartitionDesc);
-        }
-      } else {
-        // If replicating, then the partition already existing means we need 
to replace, maybe, if
-        // the destination ptn's repl.last.id is older than the replacement's.
-        if (replicationSpec.allowReplacementInto(ptn.getParameters())) {
-          if (replicationSpec.isMetadataOnly()) {
-            tracker.addTask(alterSinglePartition(addPartitionDesc, 
replicationSpec, ptn));
-            if (!tracker.canAddMoreTasks()) {
-              tracker.setReplicationState(
-                  new ReplicationState(
-                      new PartitionState(table.getTableName(), 
addPartitionDesc)
-                  )
-              );
-            }
-          } else {
-            addPartition(partitionIterator.hasNext(), addPartitionDesc);
-          }
-        } else {
-          // ignore this ptn, do nothing, not an error.
-        }
+      Task<?> ptnRootTask = null;
+      ReplLoadOpType loadPtnType = getLoadPartitionType(partSpec);
+      switch (loadPtnType) {
+        case LOAD_NEW:
+          break;
+        case LOAD_REPLACE:
+          ptnRootTask = dropPartitionTask(table, partSpec);
+          break;
+        case LOAD_SKIP:
+          continue;
+        default:
+          break;
       }
+      addPartition(partitionIterator.hasNext(), addPartitionDesc, ptnRootTask);
     }
     return tracker;
   }
+
+  private ReplLoadOpType getLoadPartitionType(Map<String, String> partSpec) 
throws InvalidOperationException, HiveException {
+    Partition ptn = context.hiveDb.getPartition(table, partSpec, false);
+    if (ptn == null) {
+      return ReplLoadOpType.LOAD_NEW;
+    }
+    if (ReplUtils.replCkptStatus(tableContext.dbNameToLoadIn, 
ptn.getParameters(), context.dumpDirectory)) {
+      return ReplLoadOpType.LOAD_SKIP;
+    }
+    return ReplLoadOpType.LOAD_REPLACE;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f20311b0/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index f2b7fa4..f918f9d 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -19,6 +19,7 @@ package 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -27,17 +28,21 @@ import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DropTableDesc;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
 import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
@@ -88,7 +93,6 @@ public class LoadTable {
       // Executed if relevant, and used to contain all the other details about 
the table if not.
       ImportTableDesc tableDesc = 
tableContext.overrideProperties(event.tableDesc(dbName));
       Table table = ImportSemanticAnalyzer.tableIfExists(tableDesc, 
context.hiveDb);
-      ReplicationSpec replicationSpec = event.replicationSpec();
 
       // Normally, on import, trying to create a table or a partition in a db 
that does not yet exist
       // is a error condition. However, in the case of a REPL LOAD, it is 
possible that we are trying
@@ -106,25 +110,24 @@ public class LoadTable {
         }
       }
 
-      if (table == null) {
-        // If table doesn't exist, allow creating a new one only if the 
database state is older than the update.
-        if ((parentDb != null) && (!replicationSpec
-            .allowReplacementInto(parentDb.getParameters()))) {
-          // If the target table exists and is newer or same as current update 
based on repl.last.id, then just noop it.
+      Task<?> tblRootTask = null;
+      ReplLoadOpType loadTblType = getLoadTableType(table);
+      switch (loadTblType) {
+        case LOAD_NEW:
+          break;
+        case LOAD_REPLACE:
+          tblRootTask = dropTableTask(table);
+          break;
+        case LOAD_SKIP:
           return tracker;
-        }
-      } else {
-        if (!replicationSpec.allowReplacementInto(table.getParameters())) {
-          // If the target table exists and is newer or same as current update 
based on repl.last.id, then just noop it.
-          return tracker;
-        }
+        default:
+          break;
       }
 
       if (tableDesc.getLocation() == null) {
         tableDesc.setLocation(location(tableDesc, parentDb));
       }
 
-
   /* Note: In the following section, Metadata-only import handling logic is
      interleaved with regular repl-import logic. The rule of thumb being
      followed here is that MD-only imports are essentially ALTERs. They do
@@ -134,11 +137,7 @@ public class LoadTable {
      or in the case of an unpartitioned table. In all other cases, it should
      behave like a noop or a pure MD alter.
   */
-      if (table == null) {
-        newTableTasks(tableDesc);
-      } else {
-        existingTableTasks(tableDesc, table, replicationSpec);
-      }
+      newTableTasks(tableDesc, tblRootTask);
 
       // Set Checkpoint task as dependant to create table task. So, if same 
dump is retried for
       // bootstrap, we skip current table update.
@@ -160,54 +159,48 @@ public class LoadTable {
     }
   }
 
-  private void existingTableTasks(ImportTableDesc tblDesc, Table table,
-      ReplicationSpec replicationSpec) {
-    if (!table.isPartitioned()) {
-
-      LOG.debug("table non-partitioned");
-      if (!replicationSpec.allowReplacementInto(table.getParameters())) {
-        return; // silently return, table is newer than our replacement.
-      }
-
-      Task<? extends Serializable> alterTableTask = alterTableTask(tblDesc, 
replicationSpec);
-      if (replicationSpec.isMetadataOnly()) {
-        tracker.addTask(alterTableTask);
-      } else {
-        Task<?> loadTableTask =
-            loadTableTask(table, replicationSpec, table.getDataLocation(), 
event.metadataPath());
-        alterTableTask.addDependentTask(loadTableTask);
-        tracker.addTask(alterTableTask);
-      }
+  private ReplLoadOpType getLoadTableType(Table table) throws 
InvalidOperationException, HiveException {
+    if (table == null) {
+      return ReplLoadOpType.LOAD_NEW;
+    }
+    if (ReplUtils.replCkptStatus(table.getDbName(), table.getParameters(), 
context.dumpDirectory)) {
+      return ReplLoadOpType.LOAD_SKIP;
     }
+    return ReplLoadOpType.LOAD_REPLACE;
   }
 
-  private void newTableTasks(ImportTableDesc tblDesc) throws Exception {
+  private void newTableTasks(ImportTableDesc tblDesc, Task<?> tblRootTask) 
throws Exception {
     Table table = tblDesc.toTable(context.hiveConf);
-    // Either we're dropping and re-creating, or the table didn't exist, and 
we're creating.
+    ReplicationSpec replicationSpec = event.replicationSpec();
     Task<?> createTableTask =
         tblDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), 
context.hiveConf);
-    if (event.replicationSpec().isMetadataOnly()) {
-      tracker.addTask(createTableTask);
+    if (tblRootTask == null) {
+      tblRootTask = createTableTask;
+    } else {
+      tblRootTask.addDependentTask(createTableTask);
+    }
+    if (replicationSpec.isMetadataOnly()) {
+      tracker.addTask(tblRootTask);
       return;
     }
 
     Task<?> parentTask = createTableTask;
-    if (event.replicationSpec().isTransactionalTableDump()) {
+    if (replicationSpec.isTransactionalTableDump()) {
       List<String> partNames = isPartitioned(tblDesc) ? 
event.partitions(tblDesc) : null;
       ReplTxnWork replTxnWork = new ReplTxnWork(tblDesc.getDatabaseName(), 
tblDesc.getTableName(), partNames,
-              event.replicationSpec().getValidWriteIdList(), 
ReplTxnWork.OperationType.REPL_WRITEID_STATE);
+              replicationSpec.getValidWriteIdList(), 
ReplTxnWork.OperationType.REPL_WRITEID_STATE);
       Task<?> replTxnTask = TaskFactory.get(replTxnWork, context.hiveConf);
-      createTableTask.addDependentTask(replTxnTask);
+      parentTask.addDependentTask(replTxnTask);
       parentTask = replTxnTask;
     }
     if (!isPartitioned(tblDesc)) {
       LOG.debug("adding dependent ReplTxnTask/CopyWork/MoveWork for table");
       Task<?> loadTableTask =
-          loadTableTask(table, event.replicationSpec(), new 
Path(tblDesc.getLocation()),
+          loadTableTask(table, replicationSpec, new 
Path(tblDesc.getLocation()),
               event.metadataPath());
       parentTask.addDependentTask(loadTableTask);
     }
-    tracker.addTask(createTableTask);
+    tracker.addTask(tblRootTask);
   }
 
   private String location(ImportTableDesc tblDesc, Database parentDb)
@@ -249,12 +242,10 @@ public class LoadTable {
     return copyTask;
   }
 
-  private Task<? extends Serializable> alterTableTask(ImportTableDesc 
tableDesc,
-      ReplicationSpec replicationSpec) {
-    tableDesc.setReplaceMode(true);
-    if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) 
{
-      tableDesc.setReplicationSpec(replicationSpec);
-    }
-    return tableDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), 
context.hiveConf);
+  private Task<?> dropTableTask(Table table) {
+    assert(table != null);
+    DropTableDesc dropTblDesc = new 
DropTableDesc(table.getFullyQualifiedName(), table.getTableType(),
+            true, false, event.replicationSpec());
+    return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), 
dropTblDesc), context.hiveConf);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f20311b0/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index d7b3104..356a8c4 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
-import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
@@ -352,10 +351,13 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     // import job in its place.
 
     try {
-
+      assert(path != null);
       Path loadPath = new Path(path);
       final FileSystem fs = loadPath.getFileSystem(conf);
 
+      // Make fully qualified path for further use.
+      loadPath = fs.makeQualified(loadPath);
+
       if (!fs.exists(loadPath)) {
         // supposed dump path does not exist.
         throw new FileNotFoundException(loadPath.toUri().toString());

http://git-wip-us.apache.org/repos/asf/hive/blob/f20311b0/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
index 7281a1c..939884d 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
@@ -20,21 +20,15 @@ package org.apache.hadoop.hive.ql.parse.repl.load.message;
 import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.DropTableDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -47,7 +41,7 @@ public class DropPartitionHandler extends 
AbstractMessageHandler {
       String actualDbName = context.isDbNameEmpty() ? msg.getDB() : 
context.dbName;
       String actualTblName = context.isTableNameEmpty() ? msg.getTable() : 
context.tableName;
       Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs =
-          genPartSpecs(new Table(msg.getTableObj()),
+          ReplUtils.genPartSpecs(new Table(msg.getTableObj()),
               msg.getPartitions());
       if (partSpecs.size() > 0) {
         DropTableDesc dropPtnDesc = new DropTableDesc(actualDbName + "." + 
actualTblName,
@@ -70,37 +64,4 @@ public class DropPartitionHandler extends 
AbstractMessageHandler {
           : new SemanticException("Error reading message members", e);
     }
   }
-
-  private Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs(Table table,
-      List<Map<String, String>> partitions) throws SemanticException {
-    Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = new HashMap<>();
-    int partPrefixLength = 0;
-    if (partitions.size() > 0) {
-      partPrefixLength = partitions.get(0).size();
-      // pick the length of the first ptn, we expect all ptns listed to have 
the same number of
-      // key-vals.
-    }
-    List<ExprNodeGenericFuncDesc> partitionDesc = new ArrayList<>();
-    for (Map<String, String> ptn : partitions) {
-      // convert each key-value-map to appropriate expression.
-      ExprNodeGenericFuncDesc expr = null;
-      for (Map.Entry<String, String> kvp : ptn.entrySet()) {
-        String key = kvp.getKey();
-        Object val = kvp.getValue();
-        String type = table.getPartColByName(key).getType();
-        PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type);
-        ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, 
true);
-        ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate(
-            "=", column, new 
ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, val));
-        expr = (expr == null) ? op : 
DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op);
-      }
-      if (expr != null) {
-        partitionDesc.add(expr);
-      }
-    }
-    if (partitionDesc.size() > 0) {
-      partSpecs.put(partPrefixLength, partitionDesc);
-    }
-    return partSpecs;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f20311b0/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 660b119..e99f888 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -10017,19 +10017,20 @@ public class ObjectStore implements RawStore, 
Configurable {
       final String catName, final String parent_db_name_input, final String 
parent_tbl_name_input,
       final String foreign_db_name_input, final String foreign_tbl_name_input, 
boolean allowSql,
       boolean allowJdo) throws MetaException, NoSuchObjectException {
-    final String parent_db_name = parent_db_name_input;
-    final String parent_tbl_name = parent_tbl_name_input;
-    final String foreign_db_name = foreign_db_name_input;
-    final String foreign_tbl_name = foreign_tbl_name_input;
+    final String parent_db_name = (parent_db_name_input != null) ? 
normalizeIdentifier(parent_db_name_input) : null;
+    final String parent_tbl_name = (parent_tbl_name_input != null) ? 
normalizeIdentifier(parent_tbl_name_input) : null;
+    final String foreign_db_name = (foreign_db_name_input != null) ? 
normalizeIdentifier(foreign_db_name_input) : null;
+    final String foreign_tbl_name = (foreign_tbl_name_input != null)
+                                    ? 
normalizeIdentifier(foreign_tbl_name_input) : null;
     final String db_name;
     final String tbl_name;
     if (foreign_tbl_name == null) {
       // The FK table name might be null if we are retrieving the constraint 
from the PK side
-      db_name = parent_db_name_input;
-      tbl_name = parent_tbl_name_input;
+      db_name = parent_db_name;
+      tbl_name = parent_tbl_name;
     } else {
-      db_name = foreign_db_name_input;
-      tbl_name = foreign_tbl_name_input;
+      db_name = foreign_db_name;
+      tbl_name = foreign_tbl_name;
     }
     return new GetListHelper<SQLForeignKey>(catName, db_name, tbl_name, 
allowSql, allowJdo) {
 

Reply via email to