This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new fe0f1a648b1 HIVE-26301: Fix ACID tables bootstrap during reverse 
replication in unplanned failover (Haymant Mangla reviewed by Peter Vary) 
(#3352)
fe0f1a648b1 is described below

commit fe0f1a648b14cdf27edcf7a5d323cbd060104ebf
Author: Haymant Mangla <79496857+hmangl...@users.noreply.github.com>
AuthorDate: Fri Jun 10 16:06:58 2022 +0530

    HIVE-26301: Fix ACID tables bootstrap during reverse replication in 
unplanned failover (Haymant Mangla reviewed by Peter Vary) (#3352)
---
 .../parse/TestReplicationOptimisedBootstrap.java   | 360 ++++-----------------
 .../TestReplicationScenariosExclusiveReplica.java  | 292 ++++++++++++++++-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |   5 +-
 3 files changed, 349 insertions(+), 308 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
index 5bd6ac3d362..673e41b3065 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -23,14 +23,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.QuotaUsage;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
-import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import 
org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
 import 
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -71,7 +68,7 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInstances {
+public class TestReplicationOptimisedBootstrap extends 
BaseReplicationScenariosAcidTables {
 
   String extraPrimaryDb;
 
@@ -84,8 +81,9 @@ public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInst
     overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, 
"true");
     overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, 
UserGroupInformation.getCurrentUser().getUserName());
     
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, 
"true");
-
-    internalBeforeClassSetupExclusiveReplica(overrides, overrides, 
TestReplicationOptimisedBootstrap.class);
+    overrides.put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+    overrides.put("hive.in.repl.test", "true");
+    internalBeforeClassSetup(overrides, 
TestReplicationOptimisedBootstrap.class);
   }
 
   @Before
@@ -112,7 +110,8 @@ public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInst
         .run("create external table t2 (place string) partitioned by (country 
string)")
         .run("insert into table t2 partition(country='india') values 
('chennai')")
         .run("insert into table t2 partition(country='us') values ('new 
york')")
-        .run("create table t1_managed (id int)")
+        .run("create table t1_managed (id int) clustered by(id) into 3 buckets 
stored as orc " +
+                "tblproperties (\"transactional\"=\"true\")")
         .run("insert into table t1_managed values (10)")
         .run("insert into table t1_managed values (20),(31),(42)")
         .run("create table t2_managed (place string) partitioned by (country 
string)")
@@ -125,14 +124,8 @@ public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInst
         .run("repl status " + replicatedDbName)
         .verifyResult(tuple.lastReplicationId)
         .run("use " + replicatedDbName)
-        .run("show tables like 't1'")
-        .verifyResult("t1")
-        .run("show tables like 't2'")
-        .verifyResult("t2")
-        .run("show tables like 't1_managed'")
-        .verifyResult("t1_managed")
-        .run("show tables like 't2_managed'")
-        .verifyResult("t2_managed")
+        .run("show tables")
+        .verifyResults(new String[]{"t1", "t2", "t1_managed", "t2_managed"})
         .verifyReplTargetProperty(replicatedDbName);
 
     // Do an incremental dump & load, Add one table which we can drop & an 
empty table as well.
@@ -145,10 +138,8 @@ public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInst
 
     replica.load(replicatedDbName, primaryDbName, withClause)
         .run("use " + replicatedDbName)
-        .run("show tables like 't5_managed'")
-        .verifyResult("t5_managed")
-        .run("show tables like 't6_managed'")
-        .verifyResult("t6_managed")
+        .run("show tables")
+        .verifyResults(new String[]{"t1", "t2", "t1_managed", "t2_managed", 
"t5_managed", "t6_managed"})
         .verifyReplTargetProperty(replicatedDbName);
 
     // Do some modifications on other database with similar table names &  
some modifications on original source
@@ -161,7 +152,8 @@ public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInst
         .run("create external table t4 (id int)")
         .run("insert into table t4 values (100)")
         .run("insert into table t4 values (201)")
-        .run("create table t4_managed (id int)")
+        .run("create table t4_managed (id int) clustered by(id) into 3 buckets 
stored as orc " +
+                "tblproperties (\"transactional\"=\"true\")")
         .run("insert into table t4_managed values (110)")
         .run("insert into table t4_managed values (220)")
         .run("insert into table t2 partition(country='france') values 
('lyon')")
@@ -475,281 +467,34 @@ public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInst
   }
 
   @Test
-  public void testTargetEventIdGenerationAfterFirstIncremental() throws 
Throwable {
-    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
-    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
-
-    // Do a bootstrap cycle(A->B)
-    primary.dump(primaryDbName, withClause);
-    replica.load(replicatedDbName, primaryDbName, withClause);
-
-    // Add some table & do an incremental dump.
-    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
-        .run("create external table table1 (id int)")
-        .run("insert into table table1 values (100)")
-        .run("create  table table1_managed (name string)")
-        .run("insert into table table1_managed values ('ABC')")
-        .dump(primaryDbName, withClause);
-
-    // Do an incremental load
-    replica.load(replicatedDbName, primaryDbName, withClause);
-
-    // Get the latest notification from the notification log for the target 
database, just after replication.
-    CurrentNotificationEventId notificationIdAfterRepl = 
replica.getCurrentNotificationEventId();
-
-    // Check the tables are there post incremental load.
-    replica.run("repl status " + replicatedDbName)
-        .verifyResult(tuple.lastReplicationId)
-        .run("use " + replicatedDbName)
-        .run("select id from table1")
-        .verifyResult("100")
-        .run("select name from table1_managed")
-        .verifyResult("ABC")
-        .verifyReplTargetProperty(replicatedDbName);
-
-    // Do some modifications on the source cluster, so we have some entries in 
the table diff.
-    primary.run("use " + primaryDbName)
-        .run("create table table2_managed (id string)")
-        .run("insert into table table1_managed values ('SDC')")
-        .run("insert into table table2_managed values ('A'),('B'),('C')");
-
-
-    // Do some modifications in another database to have unrelated events as 
well after the last load, which should
-    // get filtered.
-
-    primary.run("create database " + extraPrimaryDb)
-        .run("use " + extraPrimaryDb)
-        .run("create external table t1 (id int)")
-        .run("insert into table t1 values (15),(1),(96)")
-        .run("create  table t1_managed (id string)")
-        .run("insert into table t1_managed values ('SA'),('PS')");
-
-    // Do some modifications on the target database.
-    replica.run("use " + replicatedDbName)
-        .run("alter database "+ replicatedDbName + " set DBPROPERTIES 
('key1'='value1')")
-        .run("alter database "+ replicatedDbName + " set DBPROPERTIES 
('key2'='value2')");
-
-    // Validate the current replication id on original target has changed now.
-    assertNotEquals(replica.getCurrentNotificationEventId().getEventId(), 
notificationIdAfterRepl.getEventId());
-
-    // Prepare for reverse replication.
-    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
-    Path newReplDir = new Path(replica.repldDir + "reverse1");
-    replicaFs.mkdirs(newReplDir);
-    withClause = ReplicationTestUtils.includeExternalTableClause(true);
-    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
-
-    tuple = replica.dump(replicatedDbName);
-
-    // Check event ack file should get created.
-    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " 
doesn't exist",
-        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
-
-    // Get the target event id.
-    NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf)
-        .getNextNotification(Long.parseLong(getEventIdFromFile(new 
Path(tuple.dumpLocation), conf)[1]), -1,
-            new DatabaseAndTableFilter(replicatedDbName, null));
-
-    // There should be 2 events, two custom alter operations.
-    assertEquals(2, nl.getEvents().size());
-  }
-
-  @Test
-  public void testTargetEventIdGeneration() throws Throwable {
-    // Do a a cycle of bootstrap dump & load.
-    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
-    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
-
-    // Do a bootstrap cycle(A->B)
-    primary.dump(primaryDbName, withClause);
-    replica.load(replicatedDbName, primaryDbName, withClause);
-
-    // Add some table & do the first incremental dump.
-    primary.run("use " + primaryDbName)
-        .run("create external table tablei1 (id int)")
-        .run("create external table tablei2 (id int)")
-        .run("create table tablem1 (id int)")
-        .run("create table tablem2 (id int)")
-        .run("insert into table tablei1 values(1),(2),(3),(4)")
-        .run("insert into table tablei2 values(10),(20),(30),(40)")
-        .run("insert into table tablem1 values(5),(10),(15),(20)")
-        .run("insert into table tablem2 values(6),(12),(18),(24)")
-        .dump(primaryDbName, withClause);
-
-    // Do the incremental load, and check everything is intact.
-    replica.load(replicatedDbName, primaryDbName, withClause)
-        .run("use "+ replicatedDbName)
-        .run("select id from tablei1")
-        .verifyResults(new String[]{"1","2","3","4"})
-        .run("select id from tablei2")
-        .verifyResults(new String[]{"10","20","30","40"})
-        .run("select id from tablem1")
-        .verifyResults(new String[]{"5","10","15","20"})
-        .run("select id from tablem2")
-        .verifyResults(new String[]{"6","12","18","24"});
-
-    // Do some modifications & call for the second cycle of incremental dump & 
load.
-    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
-        .run("create external table table1 (id int)")
-        .run("insert into table table1 values (25),(35),(82)")
-        .run("create  table table1_managed (name string)")
-        .run("insert into table table1_managed values ('CAD'),('DAS'),('MSA')")
-        .run("insert into table tablei1 values(15),(62),(25),(62)")
-        .run("insert into table tablei2 values(10),(22),(11),(22)")
-        .run("insert into table tablem1 values(5),(10),(15),(20)")
-        .run("alter table table1 set TBLPROPERTIES('comment'='abc')")
-        .dump(primaryDbName, withClause);
-
-    // Do an incremental load
-    replica.load(replicatedDbName, primaryDbName, withClause);
-
-    // Get the latest notification from the notification log for the target 
database, just after replication.
-    CurrentNotificationEventId notificationIdAfterRepl = 
replica.getCurrentNotificationEventId();
-
-    // Check the tables are there post incremental load.
-    replica.run("repl status " + replicatedDbName)
-        .verifyResult(tuple.lastReplicationId)
-        .run("use " + replicatedDbName)
-        .run("select id from table1")
-        .verifyResults(new String[]{"25", "35", "82"})
-        .run("select name from table1_managed")
-        .verifyResults(new String[]{"CAD", "DAS", "MSA"})
-        .verifyReplTargetProperty(replicatedDbName);
-
-    // Do some modifications on the source cluster, so we have some entries in 
the table diff.
-    primary.run("use " + primaryDbName)
-        .run("create table table2_managed (id string)")
-        .run("insert into table table1_managed values ('AAA'),('BBB')")
-        .run("insert into table table2_managed values ('A1'),('B1'),('C2')");
-
-
-    // Do some modifications in another database to have unrelated events as 
well after the last load, which should
-    // get filtered.
-
-    primary.run("create database " + extraPrimaryDb)
-        .run("use " + extraPrimaryDb)
-        .run("create external table table1 (id int)")
-        .run("insert into table table1 values (15),(1),(96)")
-        .run("create  table table1_managed (id string)")
-        .run("insert into table table1_managed values ('SAA'),('PSA')");
-
-    // Do some modifications on the target database.
-    replica.run("use " + replicatedDbName)
-        .run("alter database "+ replicatedDbName + " set DBPROPERTIES 
('repl1'='value1')")
-        .run("alter database "+ replicatedDbName + " set DBPROPERTIES 
('repl2'='value2')");
-
-    // Validate the current replication id on original target has changed now.
-    assertNotEquals(replica.getCurrentNotificationEventId().getEventId(), 
notificationIdAfterRepl.getEventId());
-
-    // Prepare for reverse replication.
-    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
-    Path newReplDir = new Path(replica.repldDir + "reverse01");
-    replicaFs.mkdirs(newReplDir);
-    withClause = ReplicationTestUtils.includeExternalTableClause(true);
-    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
-
-    tuple = replica.dump(replicatedDbName, withClause);
-
-    // Check event ack file should get created.
-    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " 
doesn't exist",
-        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
-
-    // Get the target event id.
-    NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf)
-        .getNextNotification(Long.parseLong(getEventIdFromFile(new 
Path(tuple.dumpLocation), conf)[1]), 10,
-            new DatabaseAndTableFilter(replicatedDbName, null));
-
-    assertEquals(0, nl.getEventsSize());
-  }
-
-  @Test
-  public void testTargetEventIdWithNotificationsExpired() throws Throwable {
-    // Do a a cycle of bootstrap dump & load.
-    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
-    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
-
-    // Do a bootstrap cycle(A->B)
-    primary.dump(primaryDbName, withClause);
-    replica.load(replicatedDbName, primaryDbName, withClause);
+  public void testReverseBootstrap() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    List<String> withClause = setUpFirstIterForOptimisedBootstrap();
 
-    // Add some table & do the first incremental dump.
-    primary.run("use " + primaryDbName)
-        .run("create external table tablei1 (id int)")
-        .run("create table tablem1 (id int)")
-        .run("insert into table tablei1 values(1),(2),(3),(4)")
-        .run("insert into table tablem1 values(5),(10),(15),(20)")
-        .dump(primaryDbName, withClause);
+    // Open 3 txns for Database which is not under replication
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, 
primaryConf);
 
-    // Do the incremental load, and check everything is intact.
-    replica.load(replicatedDbName, primaryDbName, withClause)
-        .run("use "+ replicatedDbName)
-        .run("select id from tablei1")
-        .verifyResults(new String[]{"1","2","3","4"})
-        .run("select id from tablem1")
-        .verifyResults(new String[]{"5","10","15","20"});
-
-    // Explicitly make the notification logs.
-    // Get the latest notification from the notification log for the target 
database, just after replication.
-    CurrentNotificationEventId notificationIdAfterRepl = 
replica.getCurrentNotificationEventId();
-    // Inject a behaviour where some events missing from notification_log 
table.
-    // This ensures the incremental dump doesn't get all events for 
replication.
-    
InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse, 
NotificationEventResponse>
-        eventIdSkipper =
-        new 
InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse, 
NotificationEventResponse>() {
-
-      @Nullable
-      @Override
-      public NotificationEventResponse apply(@Nullable 
NotificationEventResponse eventIdList) {
-        if (null != eventIdList) {
-          List<NotificationEvent> eventIds = eventIdList.getEvents();
-          List<NotificationEvent> outEventIds = new ArrayList<>();
-          for (NotificationEvent event : eventIds) {
-            // Skip the last db event.
-            if (event.getDbName().equalsIgnoreCase(replicatedDbName)) {
-              injectionPathCalled = true;
-              continue;
-            }
-            outEventIds.add(event);
-          }
-
-          // Return the new list
-          return new NotificationEventResponse(outEventIds);
-        } else {
-          return null;
-        }
-      }
-    };
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = 
allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
 
-    try {
-      
InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(eventIdSkipper);
-
-      // Prepare for reverse replication.
-      DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
-      Path newReplDir = new Path(replica.repldDir + "reverse01");
-      replicaFs.mkdirs(newReplDir);
-      withClause = ReplicationTestUtils.includeExternalTableClause(true);
-      withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
-
-      try {
-        replica.dump(replicatedDbName, withClause);
-        fail("Expected the dump to fail since the notification event is 
missing.");
-      } catch (Exception e) {
-        // Expected due to missing notification log entry.
-      }
-
-      // Check if there is a non-recoverable error or not.
-      Path nonRecoverablePath =
-          TestReplicationScenarios.getNonRecoverablePath(newReplDir, 
replicatedDbName, replica.hiveConf);
-      assertTrue(replicaFs.exists(nonRecoverablePath));
-    } finally {
-      InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour();  // 
reset the behaviour
-    }
-  }
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForSourceDb = openTxns(numTxnsForPrimaryDb, txnHandler, 
primaryConf);
 
+    // Allocate write ids for both tables of source database.
+    Map<String, Long> tablesInSourceDb = new HashMap<>();
+    tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb + 4);
+    tablesInSourceDb.put("t2", (long) numTxnsForPrimaryDb);
+    allocateWriteIdsForTablesAndAcquireLocks(replicatedDbName, 
tablesInSourceDb, txnHandler,
+            txnsForSourceDb, replica.getConf());
 
-  @Test
-  public void testReverseBootstrap() throws Throwable {
-    List<String> withClause = setUpFirstIterForOptimisedBootstrap();
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
 
     // Do a reverse second dump, this should do a bootstrap dump for the 
tables in the table_diff and incremental for
     // rest.
@@ -757,6 +502,14 @@ public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInst
     
assertTrue("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1")));
     WarehouseInstance.Tuple tuple = replica.dump(replicatedDbName, withClause);
 
+    //Verify that openTxns for sourceDb were aborted before proceeding with 
bootstrap dump.
+    verifyAllOpenTxnsAborted(txnsForSourceDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf);
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks));
+    releaseLocks(txnHandler, lockIdsForSecDb);
+
     String hiveDumpDir = tuple.dumpLocation + File.separator + 
ReplUtils.REPL_HIVE_BASE_DIR;
     // _bootstrap directory should be created as bootstrap enabled on external 
tables.
     Path dumpPath1 = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME +"/" + 
EximUtil.METADATA_PATH_NAME +"/" + replicatedDbName);
@@ -950,7 +703,8 @@ public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInst
 
     // Create some partitioned and non partitioned tables and do a dump & load.
     WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
-        .run("create table t1 (id int)")
+        .run("create table t1 (id int) clustered by(id) into 3 buckets stored 
as orc " +
+                "tblproperties (\"transactional\"=\"true\")")
         .run("insert into table t1 values (1)")
         .run("insert into table t1 values (2),(3),(4)")
         .run("create table t2 (id int)")
@@ -968,14 +722,8 @@ public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInst
         .run("repl status " + replicatedDbName)
         .verifyResult(tuple.lastReplicationId)
         .run("use " + replicatedDbName)
-        .run("show tables like 't1'")
-        .verifyResult("t1")
-        .run("show tables like 't2'")
-        .verifyResult("t2")
-        .run("show tables like 't3'")
-        .verifyResult("t3")
-        .run("show tables like 't4'")
-        .verifyResult("t4")
+        .run("show tables")
+        .verifyResults(new String[]{"t1", "t2", "t3", "t4"})
         .verifyReplTargetProperty(replicatedDbName);
 
     // Prepare for reverse bootstrap.
@@ -1083,7 +831,10 @@ public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInst
 
     // Create 4 managed tables and do a dump & load.
     WarehouseInstance.Tuple tuple =
-        primary.run("use " + primaryDbName).run("create table t1 (id 
int)").run("insert into table t1 values (1)")
+        primary.run("use " + primaryDbName)
+                .run("create table t1 (id int) clustered by(id) into 3 buckets 
stored as orc " +
+                        "tblproperties (\"transactional\"=\"true\")")
+                .run("insert into table t1 values (1)")
             .run("insert into table t1 values (2),(3),(4)")
             .run("create table t2 (place string) partitioned by (country 
string)")
             .run("insert into table t2 partition(country='india') values 
('chennai')")
@@ -1100,7 +851,8 @@ public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInst
         .verifyResult("t3").run("show tables like 
't4'").verifyResult("t4").verifyReplTargetProperty(replicatedDbName);
 
     // Do some modifications on original source cluster. The diff 
becomes(tnew_managed, t1, t2, t3)
-    primary.run("use " + primaryDbName).run("create table tnew_managed (id 
int)")
+    primary.run("use " + primaryDbName).run("create table tnew_managed (id 
int) clustered by(id) into 3 buckets " +
+                    "stored as orc tblproperties (\"transactional\"=\"true\")")
         .run("insert into table t1 values (25)").run("insert into table 
tnew_managed values (110)")
         .run("insert into table t2 partition(country='france') values 
('lyon')").run("drop table t3")
         .run("alter database "+ primaryDbName + " set DBPROPERTIES 
('key1'='value1')");
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
index c9f4753ba99..8710e2c70a0 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
@@ -19,10 +19,17 @@ package org.apache.hadoop.hive.ql.parse;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import 
org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
 import 
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -31,6 +38,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import javax.annotation.Nullable;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -39,7 +47,6 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -47,12 +54,22 @@ import java.util.Map;
 import java.util.Set;
 
 import static 
org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Test replication scenarios with staging on replica.
  */
 public class TestReplicationScenariosExclusiveReplica extends 
BaseReplicationAcrossInstances {
 
+  String extraPrimaryDb;
+
   @BeforeClass
   public static void classLevelSetup() throws Exception {
     Map<String, String> overrides = new HashMap<>();
@@ -68,6 +85,7 @@ public class TestReplicationScenariosExclusiveReplica extends 
BaseReplicationAcr
   @Before
   public void setup() throws Throwable {
     super.setup();
+    extraPrimaryDb = "extra_" + primaryDbName;
   }
 
   @After
@@ -75,6 +93,278 @@ public class TestReplicationScenariosExclusiveReplica 
extends BaseReplicationAcr
     super.tearDown();
   }
 
+  @Test
+  public void testTargetEventIdGenerationAfterFirstIncrementalInOptFailover() 
throws Throwable {
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some table & do an incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+            .run("create external table table1 (id int)")
+            .run("insert into table table1 values (100)")
+            .run("create  table table1_managed (name string)")
+            .run("insert into table table1_managed values ('ABC')")
+            .dump(primaryDbName, withClause);
+
+    // Do an incremental load
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Get the latest notification from the notification log for the target 
database, just after replication.
+    CurrentNotificationEventId notificationIdAfterRepl = 
replica.getCurrentNotificationEventId();
+
+    // Check the tables are there post incremental load.
+    replica.run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("use " + replicatedDbName)
+            .run("select id from table1")
+            .verifyResult("100")
+            .run("select name from table1_managed")
+            .verifyResult("ABC")
+            .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on the source cluster, so we have some entries in 
the table diff.
+    primary.run("use " + primaryDbName)
+            .run("create table table2_managed (id string)")
+            .run("insert into table table1_managed values ('SDC')")
+            .run("insert into table table2_managed values ('A'),('B'),('C')");
+
+
+    // Do some modifications in another database to have unrelated events as 
well after the last load, which should
+    // get filtered.
+
+    primary.run("create database " + extraPrimaryDb)
+            .run("use " + extraPrimaryDb)
+            .run("create external table t1 (id int)")
+            .run("insert into table t1 values (15),(1),(96)")
+            .run("create  table t1_managed (id string)")
+            .run("insert into table t1_managed values ('SA'),('PS')");
+
+    // Do some modifications on the target database.
+    replica.run("use " + replicatedDbName)
+            .run("alter database "+ replicatedDbName + " set DBPROPERTIES 
('key1'='value1')")
+            .run("alter database "+ replicatedDbName + " set DBPROPERTIES 
('key2'='value2')");
+
+    // Validate the current replication id on original target has changed now.
+    assertNotEquals(replica.getCurrentNotificationEventId().getEventId(), 
notificationIdAfterRepl.getEventId());
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "reverse1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    tuple = replica.dump(replicatedDbName);
+
+    // Check event ack file should get created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " 
doesn't exist",
+            replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Get the target event id.
+    NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf)
+            .getNextNotification(Long.parseLong(getEventIdFromFile(new 
Path(tuple.dumpLocation), conf)[1]), -1,
+                    new DatabaseAndTableFilter(replicatedDbName, null));
+
+    // There should be 2 events, two custom alter operations.
+    assertEquals(2, nl.getEvents().size());
+  }
+
+  @Test
+  public void testTargetEventIdGenerationInOptmisedFailover() throws Throwable 
{
+    // Do a a cycle of bootstrap dump & load.
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some table & do the first incremental dump.
+    primary.run("use " + primaryDbName)
+            .run("create external table tablei1 (id int)")
+            .run("create external table tablei2 (id int)")
+            .run("create table tablem1 (id int)")
+            .run("create table tablem2 (id int)")
+            .run("insert into table tablei1 values(1),(2),(3),(4)")
+            .run("insert into table tablei2 values(10),(20),(30),(40)")
+            .run("insert into table tablem1 values(5),(10),(15),(20)")
+            .run("insert into table tablem2 values(6),(12),(18),(24)")
+            .dump(primaryDbName, withClause);
+
+    // Do the incremental load, and check everything is intact.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use "+ replicatedDbName)
+            .run("select id from tablei1")
+            .verifyResults(new String[]{"1","2","3","4"})
+            .run("select id from tablei2")
+            .verifyResults(new String[]{"10","20","30","40"})
+            .run("select id from tablem1")
+            .verifyResults(new String[]{"5","10","15","20"})
+            .run("select id from tablem2")
+            .verifyResults(new String[]{"6","12","18","24"});
+
+    // Do some modifications & call for the second cycle of incremental dump & 
load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+            .run("create external table table1 (id int)")
+            .run("insert into table table1 values (25),(35),(82)")
+            .run("create  table table1_managed (name string)")
+            .run("insert into table table1_managed values 
('CAD'),('DAS'),('MSA')")
+            .run("insert into table tablei1 values(15),(62),(25),(62)")
+            .run("insert into table tablei2 values(10),(22),(11),(22)")
+            .run("insert into table tablem1 values(5),(10),(15),(20)")
+            .run("alter table table1 set TBLPROPERTIES('comment'='abc')")
+            .dump(primaryDbName, withClause);
+
+    // Do an incremental load
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Get the latest notification from the notification log for the target 
database, just after replication.
+    CurrentNotificationEventId notificationIdAfterRepl = 
replica.getCurrentNotificationEventId();
+
+    // Check the tables are there post incremental load.
+    replica.run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("use " + replicatedDbName)
+            .run("select id from table1")
+            .verifyResults(new String[]{"25", "35", "82"})
+            .run("select name from table1_managed")
+            .verifyResults(new String[]{"CAD", "DAS", "MSA"})
+            .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on the source cluster, so we have some entries in 
the table diff.
+    primary.run("use " + primaryDbName)
+            .run("create table table2_managed (id string)")
+            .run("insert into table table1_managed values ('AAA'),('BBB')")
+            .run("insert into table table2_managed values 
('A1'),('B1'),('C2')");
+
+
+    // Do some modifications in another database to have unrelated events as 
well after the last load, which should
+    // get filtered.
+
+    primary.run("create database " + extraPrimaryDb)
+            .run("use " + extraPrimaryDb)
+            .run("create external table table1 (id int)")
+            .run("insert into table table1 values (15),(1),(96)")
+            .run("create  table table1_managed (id string)")
+            .run("insert into table table1_managed values ('SAA'),('PSA')");
+
+    // Do some modifications on the target database.
+    replica.run("use " + replicatedDbName)
+            .run("alter database "+ replicatedDbName + " set DBPROPERTIES 
('repl1'='value1')")
+            .run("alter database "+ replicatedDbName + " set DBPROPERTIES 
('repl2'='value2')");
+
+    // Validate the current replication id on original target has changed now.
+    assertNotEquals(replica.getCurrentNotificationEventId().getEventId(), 
notificationIdAfterRepl.getEventId());
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "reverse01");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check event ack file should get created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " 
doesn't exist",
+            replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Get the target event id.
+    NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf)
+            .getNextNotification(Long.parseLong(getEventIdFromFile(new 
Path(tuple.dumpLocation), conf)[1]), 10,
+                    new DatabaseAndTableFilter(replicatedDbName, null));
+
+    assertEquals(0, nl.getEventsSize());
+  }
+
+  @Test
+  public void testTargetEventIdWithNotificationsExpiredInOptimisedFailover() 
throws Throwable {
+    // Do a a cycle of bootstrap dump & load.
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some table & do the first incremental dump.
+    primary.run("use " + primaryDbName)
+            .run("create external table tablei1 (id int)")
+            .run("create table tablem1 (id int)")
+            .run("insert into table tablei1 values(1),(2),(3),(4)")
+            .run("insert into table tablem1 values(5),(10),(15),(20)")
+            .dump(primaryDbName, withClause);
+
+    // Do the incremental load, and check everything is intact.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use "+ replicatedDbName)
+            .run("select id from tablei1")
+            .verifyResults(new String[]{"1","2","3","4"})
+            .run("select id from tablem1")
+            .verifyResults(new String[]{"5","10","15","20"});
+
+    // Explicitly make the notification logs.
+    // Get the latest notification from the notification log for the target 
database, just after replication.
+    CurrentNotificationEventId notificationIdAfterRepl = 
replica.getCurrentNotificationEventId();
+    // Inject a behaviour where some events missing from notification_log 
table.
+    // This ensures the incremental dump doesn't get all events for 
replication.
+    
InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse, 
NotificationEventResponse>
+            eventIdSkipper =
+            new 
InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse, 
NotificationEventResponse>() {
+
+              @Nullable
+              @Override
+              public NotificationEventResponse apply(@Nullable 
NotificationEventResponse eventIdList) {
+                if (null != eventIdList) {
+                  List<NotificationEvent> eventIds = eventIdList.getEvents();
+                  List<NotificationEvent> outEventIds = new ArrayList<>();
+                  for (NotificationEvent event : eventIds) {
+                    // Skip the last db event.
+                    if (event.getDbName().equalsIgnoreCase(replicatedDbName)) {
+                      injectionPathCalled = true;
+                      continue;
+                    }
+                    outEventIds.add(event);
+                  }
+
+                  // Return the new list
+                  return new NotificationEventResponse(outEventIds);
+                } else {
+                  return null;
+                }
+              }
+            };
+
+    try {
+      
InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(eventIdSkipper);
+
+      // Prepare for reverse replication.
+      DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+      Path newReplDir = new Path(replica.repldDir + "reverse01");
+      replicaFs.mkdirs(newReplDir);
+      withClause = ReplicationTestUtils.includeExternalTableClause(true);
+      withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+      try {
+        replica.dump(replicatedDbName, withClause);
+        fail("Expected the dump to fail since the notification event is 
missing.");
+      } catch (Exception e) {
+        // Expected due to missing notification log entry.
+      }
+
+      // Check if there is a non-recoverable error or not.
+      Path nonRecoverablePath =
+              TestReplicationScenarios.getNonRecoverablePath(newReplDir, 
replicatedDbName, replica.hiveConf);
+      assertTrue(replicaFs.exists(nonRecoverablePath));
+    } finally {
+      InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour();  // 
reset the behaviour
+    }
+  }
+
   @Test
   public void testDistCpCopyWithRemoteStagingAndCopyTaskOnTarget() throws 
Throwable {
     List<String> withClauseOptions = 
getStagingLocationConfig(replica.repldDir, true);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index b76354eb459..667ede3ca74 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -1088,9 +1088,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     // of the ACID tables might be included for bootstrap during incremental 
dump. For old policy, its because the table
     // may not satisfying the old policy but satisfying the new policy. For 
filter, it may happen that the table
     // is renamed and started satisfying the policy.
-    return ((!work.replScope.includeAllTables())
-            || (previousReplScopeModified())
-            || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES));
+    return !work.replScope.includeAllTables() || previousReplScopeModified() 
|| !tablesForBootstrap.isEmpty()
+            || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES);
   }
 
   private void dumpEvent(NotificationEvent ev, Path evRoot, Path dumpRoot, 
Path cmRoot, Hive db) throws Exception {

Reply via email to