This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new fe0f1a648b1 HIVE-26301: Fix ACID tables bootstrap during reverse replication in unplanned failover (Haymant Mangla reviewed by Peter Vary) (#3352) fe0f1a648b1 is described below commit fe0f1a648b14cdf27edcf7a5d323cbd060104ebf Author: Haymant Mangla <79496857+hmangl...@users.noreply.github.com> AuthorDate: Fri Jun 10 16:06:58 2022 +0530 HIVE-26301: Fix ACID tables bootstrap during reverse replication in unplanned failover (Haymant Mangla reviewed by Peter Vary) (#3352) --- .../parse/TestReplicationOptimisedBootstrap.java | 360 ++++----------------- .../TestReplicationScenariosExclusiveReplica.java | 292 ++++++++++++++++- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 5 +- 3 files changed, 349 insertions(+), 308 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java index 5bd6ac3d362..673e41b3065 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java @@ -23,14 +23,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.QuotaUsage; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; -import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.security.UserGroupInformation; @@ -71,7 +68,7 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInstances { +public class TestReplicationOptimisedBootstrap extends BaseReplicationScenariosAcidTables { String extraPrimaryDb; @@ -84,8 +81,9 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true"); overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, UserGroupInformation.getCurrentUser().getUserName()); overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, "true"); - - internalBeforeClassSetupExclusiveReplica(overrides, overrides, TestReplicationOptimisedBootstrap.class); + overrides.put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + overrides.put("hive.in.repl.test", "true"); + internalBeforeClassSetup(overrides, TestReplicationOptimisedBootstrap.class); } @Before @@ -112,7 +110,8 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst .run("create external table t2 (place string) partitioned by (country string)") .run("insert into table t2 partition(country='india') values ('chennai')") .run("insert into table t2 partition(country='us') values ('new york')") - .run("create table t1_managed (id int)") + .run("create table t1_managed (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") .run("insert into table t1_managed values (10)") .run("insert into table t1_managed values (20),(31),(42)") .run("create table t2_managed (place string) partitioned by (country string)") @@ -125,14 +124,8 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst .run("repl status " + replicatedDbName) .verifyResult(tuple.lastReplicationId) .run("use " + replicatedDbName) - .run("show tables like 't1'") - .verifyResult("t1") - .run("show tables like 't2'") - .verifyResult("t2") - .run("show tables like 't1_managed'") - .verifyResult("t1_managed") - .run("show tables like 't2_managed'") - .verifyResult("t2_managed") + .run("show tables") + .verifyResults(new String[]{"t1", "t2", "t1_managed", "t2_managed"}) .verifyReplTargetProperty(replicatedDbName); // Do an incremental dump & load, Add one table which we can drop & an empty table as well. @@ -145,10 +138,8 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst replica.load(replicatedDbName, primaryDbName, withClause) .run("use " + replicatedDbName) - .run("show tables like 't5_managed'") - .verifyResult("t5_managed") - .run("show tables like 't6_managed'") - .verifyResult("t6_managed") + .run("show tables") + .verifyResults(new String[]{"t1", "t2", "t1_managed", "t2_managed", "t5_managed", "t6_managed"}) .verifyReplTargetProperty(replicatedDbName); // Do some modifications on other database with similar table names & some modifications on original source @@ -161,7 +152,8 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst .run("create external table t4 (id int)") .run("insert into table t4 values (100)") .run("insert into table t4 values (201)") - .run("create table t4_managed (id int)") + .run("create table t4_managed (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") .run("insert into table t4_managed values (110)") .run("insert into table t4_managed values (220)") .run("insert into table t2 partition(country='france') values ('lyon')") @@ -475,281 +467,34 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst } @Test - public void testTargetEventIdGenerationAfterFirstIncremental() throws Throwable { - List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); - withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'"); - - // Do a bootstrap cycle(A->B) - primary.dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, withClause); - - // Add some table & do an incremental dump. - WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) - .run("create external table table1 (id int)") - .run("insert into table table1 values (100)") - .run("create table table1_managed (name string)") - .run("insert into table table1_managed values ('ABC')") - .dump(primaryDbName, withClause); - - // Do an incremental load - replica.load(replicatedDbName, primaryDbName, withClause); - - // Get the latest notification from the notification log for the target database, just after replication. - CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId(); - - // Check the tables are there post incremental load. - replica.run("repl status " + replicatedDbName) - .verifyResult(tuple.lastReplicationId) - .run("use " + replicatedDbName) - .run("select id from table1") - .verifyResult("100") - .run("select name from table1_managed") - .verifyResult("ABC") - .verifyReplTargetProperty(replicatedDbName); - - // Do some modifications on the source cluster, so we have some entries in the table diff. - primary.run("use " + primaryDbName) - .run("create table table2_managed (id string)") - .run("insert into table table1_managed values ('SDC')") - .run("insert into table table2_managed values ('A'),('B'),('C')"); - - - // Do some modifications in another database to have unrelated events as well after the last load, which should - // get filtered. - - primary.run("create database " + extraPrimaryDb) - .run("use " + extraPrimaryDb) - .run("create external table t1 (id int)") - .run("insert into table t1 values (15),(1),(96)") - .run("create table t1_managed (id string)") - .run("insert into table t1_managed values ('SA'),('PS')"); - - // Do some modifications on the target database. - replica.run("use " + replicatedDbName) - .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('key1'='value1')") - .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('key2'='value2')"); - - // Validate the current replication id on original target has changed now. - assertNotEquals(replica.getCurrentNotificationEventId().getEventId(), notificationIdAfterRepl.getEventId()); - - // Prepare for reverse replication. - DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem(); - Path newReplDir = new Path(replica.repldDir + "reverse1"); - replicaFs.mkdirs(newReplDir); - withClause = ReplicationTestUtils.includeExternalTableClause(true); - withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'"); - - tuple = replica.dump(replicatedDbName); - - // Check event ack file should get created. - assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist", - replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE))); - - // Get the target event id. - NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf) - .getNextNotification(Long.parseLong(getEventIdFromFile(new Path(tuple.dumpLocation), conf)[1]), -1, - new DatabaseAndTableFilter(replicatedDbName, null)); - - // There should be 2 events, two custom alter operations. - assertEquals(2, nl.getEvents().size()); - } - - @Test - public void testTargetEventIdGeneration() throws Throwable { - // Do a a cycle of bootstrap dump & load. - List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); - withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'"); - - // Do a bootstrap cycle(A->B) - primary.dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, withClause); - - // Add some table & do the first incremental dump. - primary.run("use " + primaryDbName) - .run("create external table tablei1 (id int)") - .run("create external table tablei2 (id int)") - .run("create table tablem1 (id int)") - .run("create table tablem2 (id int)") - .run("insert into table tablei1 values(1),(2),(3),(4)") - .run("insert into table tablei2 values(10),(20),(30),(40)") - .run("insert into table tablem1 values(5),(10),(15),(20)") - .run("insert into table tablem2 values(6),(12),(18),(24)") - .dump(primaryDbName, withClause); - - // Do the incremental load, and check everything is intact. - replica.load(replicatedDbName, primaryDbName, withClause) - .run("use "+ replicatedDbName) - .run("select id from tablei1") - .verifyResults(new String[]{"1","2","3","4"}) - .run("select id from tablei2") - .verifyResults(new String[]{"10","20","30","40"}) - .run("select id from tablem1") - .verifyResults(new String[]{"5","10","15","20"}) - .run("select id from tablem2") - .verifyResults(new String[]{"6","12","18","24"}); - - // Do some modifications & call for the second cycle of incremental dump & load. - WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) - .run("create external table table1 (id int)") - .run("insert into table table1 values (25),(35),(82)") - .run("create table table1_managed (name string)") - .run("insert into table table1_managed values ('CAD'),('DAS'),('MSA')") - .run("insert into table tablei1 values(15),(62),(25),(62)") - .run("insert into table tablei2 values(10),(22),(11),(22)") - .run("insert into table tablem1 values(5),(10),(15),(20)") - .run("alter table table1 set TBLPROPERTIES('comment'='abc')") - .dump(primaryDbName, withClause); - - // Do an incremental load - replica.load(replicatedDbName, primaryDbName, withClause); - - // Get the latest notification from the notification log for the target database, just after replication. - CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId(); - - // Check the tables are there post incremental load. - replica.run("repl status " + replicatedDbName) - .verifyResult(tuple.lastReplicationId) - .run("use " + replicatedDbName) - .run("select id from table1") - .verifyResults(new String[]{"25", "35", "82"}) - .run("select name from table1_managed") - .verifyResults(new String[]{"CAD", "DAS", "MSA"}) - .verifyReplTargetProperty(replicatedDbName); - - // Do some modifications on the source cluster, so we have some entries in the table diff. - primary.run("use " + primaryDbName) - .run("create table table2_managed (id string)") - .run("insert into table table1_managed values ('AAA'),('BBB')") - .run("insert into table table2_managed values ('A1'),('B1'),('C2')"); - - - // Do some modifications in another database to have unrelated events as well after the last load, which should - // get filtered. - - primary.run("create database " + extraPrimaryDb) - .run("use " + extraPrimaryDb) - .run("create external table table1 (id int)") - .run("insert into table table1 values (15),(1),(96)") - .run("create table table1_managed (id string)") - .run("insert into table table1_managed values ('SAA'),('PSA')"); - - // Do some modifications on the target database. - replica.run("use " + replicatedDbName) - .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('repl1'='value1')") - .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('repl2'='value2')"); - - // Validate the current replication id on original target has changed now. - assertNotEquals(replica.getCurrentNotificationEventId().getEventId(), notificationIdAfterRepl.getEventId()); - - // Prepare for reverse replication. - DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem(); - Path newReplDir = new Path(replica.repldDir + "reverse01"); - replicaFs.mkdirs(newReplDir); - withClause = ReplicationTestUtils.includeExternalTableClause(true); - withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'"); - - tuple = replica.dump(replicatedDbName, withClause); - - // Check event ack file should get created. - assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist", - replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE))); - - // Get the target event id. - NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf) - .getNextNotification(Long.parseLong(getEventIdFromFile(new Path(tuple.dumpLocation), conf)[1]), 10, - new DatabaseAndTableFilter(replicatedDbName, null)); - - assertEquals(0, nl.getEventsSize()); - } - - @Test - public void testTargetEventIdWithNotificationsExpired() throws Throwable { - // Do a a cycle of bootstrap dump & load. - List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); - withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'"); - - // Do a bootstrap cycle(A->B) - primary.dump(primaryDbName, withClause); - replica.load(replicatedDbName, primaryDbName, withClause); + public void testReverseBootstrap() throws Throwable { + HiveConf primaryConf = primary.getConf(); + TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf()); + List<String> withClause = setUpFirstIterForOptimisedBootstrap(); - // Add some table & do the first incremental dump. - primary.run("use " + primaryDbName) - .run("create external table tablei1 (id int)") - .run("create table tablem1 (id int)") - .run("insert into table tablei1 values(1),(2),(3),(4)") - .run("insert into table tablem1 values(5),(10),(15),(20)") - .dump(primaryDbName, withClause); + // Open 3 txns for Database which is not under replication + int numTxnsForSecDb = 3; + List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf); - // Do the incremental load, and check everything is intact. - replica.load(replicatedDbName, primaryDbName, withClause) - .run("use "+ replicatedDbName) - .run("select id from tablei1") - .verifyResults(new String[]{"1","2","3","4"}) - .run("select id from tablem1") - .verifyResults(new String[]{"5","10","15","20"}); - - // Explicitly make the notification logs. - // Get the latest notification from the notification log for the target database, just after replication. - CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId(); - // Inject a behaviour where some events missing from notification_log table. - // This ensures the incremental dump doesn't get all events for replication. - InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse, NotificationEventResponse> - eventIdSkipper = - new InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse, NotificationEventResponse>() { - - @Nullable - @Override - public NotificationEventResponse apply(@Nullable NotificationEventResponse eventIdList) { - if (null != eventIdList) { - List<NotificationEvent> eventIds = eventIdList.getEvents(); - List<NotificationEvent> outEventIds = new ArrayList<>(); - for (NotificationEvent event : eventIds) { - // Skip the last db event. - if (event.getDbName().equalsIgnoreCase(replicatedDbName)) { - injectionPathCalled = true; - continue; - } - outEventIds.add(event); - } - - // Return the new list - return new NotificationEventResponse(outEventIds); - } else { - return null; - } - } - }; + Map<String, Long> tablesInSecDb = new HashMap<>(); + tablesInSecDb.put("t1", (long) numTxnsForSecDb); + tablesInSecDb.put("t2", (long) numTxnsForSecDb); + List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra", + tablesInSecDb, txnHandler, txnsForSecDb, primaryConf); - try { - InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(eventIdSkipper); - - // Prepare for reverse replication. - DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem(); - Path newReplDir = new Path(replica.repldDir + "reverse01"); - replicaFs.mkdirs(newReplDir); - withClause = ReplicationTestUtils.includeExternalTableClause(true); - withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'"); - - try { - replica.dump(replicatedDbName, withClause); - fail("Expected the dump to fail since the notification event is missing."); - } catch (Exception e) { - // Expected due to missing notification log entry. - } - - // Check if there is a non-recoverable error or not. - Path nonRecoverablePath = - TestReplicationScenarios.getNonRecoverablePath(newReplDir, replicatedDbName, replica.hiveConf); - assertTrue(replicaFs.exists(nonRecoverablePath)); - } finally { - InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour - } - } + //Open 2 txns for Primary Db + int numTxnsForPrimaryDb = 2; + List<Long> txnsForSourceDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf); + // Allocate write ids for both tables of source database. + Map<String, Long> tablesInSourceDb = new HashMap<>(); + tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb + 4); + tablesInSourceDb.put("t2", (long) numTxnsForPrimaryDb); + allocateWriteIdsForTablesAndAcquireLocks(replicatedDbName, tablesInSourceDb, txnHandler, + txnsForSourceDb, replica.getConf()); - @Test - public void testReverseBootstrap() throws Throwable { - List<String> withClause = setUpFirstIterForOptimisedBootstrap(); + //Open 1 txn with no hive locks acquired + List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf); // Do a reverse second dump, this should do a bootstrap dump for the tables in the table_diff and incremental for // rest. @@ -757,6 +502,14 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst assertTrue("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1"))); WarehouseInstance.Tuple tuple = replica.dump(replicatedDbName, withClause); + //Verify that openTxns for sourceDb were aborted before proceeding with bootstrap dump. + verifyAllOpenTxnsAborted(txnsForSourceDb, primaryConf); + verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf); + verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf); + txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb)); + txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks)); + releaseLocks(txnHandler, lockIdsForSecDb); + String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; // _bootstrap directory should be created as bootstrap enabled on external tables. Path dumpPath1 = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME +"/" + EximUtil.METADATA_PATH_NAME +"/" + replicatedDbName); @@ -950,7 +703,8 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst // Create some partitioned and non partitioned tables and do a dump & load. WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) - .run("create table t1 (id int)") + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") .run("insert into table t1 values (1)") .run("insert into table t1 values (2),(3),(4)") .run("create table t2 (id int)") @@ -968,14 +722,8 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst .run("repl status " + replicatedDbName) .verifyResult(tuple.lastReplicationId) .run("use " + replicatedDbName) - .run("show tables like 't1'") - .verifyResult("t1") - .run("show tables like 't2'") - .verifyResult("t2") - .run("show tables like 't3'") - .verifyResult("t3") - .run("show tables like 't4'") - .verifyResult("t4") + .run("show tables") + .verifyResults(new String[]{"t1", "t2", "t3", "t4"}) .verifyReplTargetProperty(replicatedDbName); // Prepare for reverse bootstrap. @@ -1083,7 +831,10 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst // Create 4 managed tables and do a dump & load. WarehouseInstance.Tuple tuple = - primary.run("use " + primaryDbName).run("create table t1 (id int)").run("insert into table t1 values (1)") + primary.run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into table t1 values (1)") .run("insert into table t1 values (2),(3),(4)") .run("create table t2 (place string) partitioned by (country string)") .run("insert into table t2 partition(country='india') values ('chennai')") @@ -1100,7 +851,8 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst .verifyResult("t3").run("show tables like 't4'").verifyResult("t4").verifyReplTargetProperty(replicatedDbName); // Do some modifications on original source cluster. The diff becomes(tnew_managed, t1, t2, t3) - primary.run("use " + primaryDbName).run("create table tnew_managed (id int)") + primary.run("use " + primaryDbName).run("create table tnew_managed (id int) clustered by(id) into 3 buckets " + + "stored as orc tblproperties (\"transactional\"=\"true\")") .run("insert into table t1 values (25)").run("insert into table tnew_managed values (110)") .run("insert into table t2 partition(country='france') values ('lyon')").run("drop table t3") .run("alter database "+ primaryDbName + " set DBPROPERTIES ('key1'='value1')"); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java index c9f4753ba99..8710e2c70a0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java @@ -19,10 +19,17 @@ package org.apache.hadoop.hive.ql.parse; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -31,6 +38,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import javax.annotation.Nullable; import java.io.BufferedReader; import java.io.File; @@ -39,7 +47,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -47,12 +54,22 @@ import java.util.Map; import java.util.Set; import static org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION; +import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE; +import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Test replication scenarios with staging on replica. */ public class TestReplicationScenariosExclusiveReplica extends BaseReplicationAcrossInstances { + String extraPrimaryDb; + @BeforeClass public static void classLevelSetup() throws Exception { Map<String, String> overrides = new HashMap<>(); @@ -68,6 +85,7 @@ public class TestReplicationScenariosExclusiveReplica extends BaseReplicationAcr @Before public void setup() throws Throwable { super.setup(); + extraPrimaryDb = "extra_" + primaryDbName; } @After @@ -75,6 +93,278 @@ public class TestReplicationScenariosExclusiveReplica extends BaseReplicationAcr super.tearDown(); } + @Test + public void testTargetEventIdGenerationAfterFirstIncrementalInOptFailover() throws Throwable { + List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'"); + + // Do a bootstrap cycle(A->B) + primary.dump(primaryDbName, withClause); + replica.load(replicatedDbName, primaryDbName, withClause); + + // Add some table & do an incremental dump. + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create external table table1 (id int)") + .run("insert into table table1 values (100)") + .run("create table table1_managed (name string)") + .run("insert into table table1_managed values ('ABC')") + .dump(primaryDbName, withClause); + + // Do an incremental load + replica.load(replicatedDbName, primaryDbName, withClause); + + // Get the latest notification from the notification log for the target database, just after replication. + CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId(); + + // Check the tables are there post incremental load. + replica.run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("select id from table1") + .verifyResult("100") + .run("select name from table1_managed") + .verifyResult("ABC") + .verifyReplTargetProperty(replicatedDbName); + + // Do some modifications on the source cluster, so we have some entries in the table diff. + primary.run("use " + primaryDbName) + .run("create table table2_managed (id string)") + .run("insert into table table1_managed values ('SDC')") + .run("insert into table table2_managed values ('A'),('B'),('C')"); + + + // Do some modifications in another database to have unrelated events as well after the last load, which should + // get filtered. + + primary.run("create database " + extraPrimaryDb) + .run("use " + extraPrimaryDb) + .run("create external table t1 (id int)") + .run("insert into table t1 values (15),(1),(96)") + .run("create table t1_managed (id string)") + .run("insert into table t1_managed values ('SA'),('PS')"); + + // Do some modifications on the target database. + replica.run("use " + replicatedDbName) + .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('key1'='value1')") + .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('key2'='value2')"); + + // Validate the current replication id on original target has changed now. + assertNotEquals(replica.getCurrentNotificationEventId().getEventId(), notificationIdAfterRepl.getEventId()); + + // Prepare for reverse replication. + DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem(); + Path newReplDir = new Path(replica.repldDir + "reverse1"); + replicaFs.mkdirs(newReplDir); + withClause = ReplicationTestUtils.includeExternalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'"); + + tuple = replica.dump(replicatedDbName); + + // Check event ack file should get created. + assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist", + replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE))); + + // Get the target event id. + NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf) + .getNextNotification(Long.parseLong(getEventIdFromFile(new Path(tuple.dumpLocation), conf)[1]), -1, + new DatabaseAndTableFilter(replicatedDbName, null)); + + // There should be 2 events, two custom alter operations. + assertEquals(2, nl.getEvents().size()); + } + + @Test + public void testTargetEventIdGenerationInOptmisedFailover() throws Throwable { + // Do a a cycle of bootstrap dump & load. + List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'"); + + // Do a bootstrap cycle(A->B) + primary.dump(primaryDbName, withClause); + replica.load(replicatedDbName, primaryDbName, withClause); + + // Add some table & do the first incremental dump. + primary.run("use " + primaryDbName) + .run("create external table tablei1 (id int)") + .run("create external table tablei2 (id int)") + .run("create table tablem1 (id int)") + .run("create table tablem2 (id int)") + .run("insert into table tablei1 values(1),(2),(3),(4)") + .run("insert into table tablei2 values(10),(20),(30),(40)") + .run("insert into table tablem1 values(5),(10),(15),(20)") + .run("insert into table tablem2 values(6),(12),(18),(24)") + .dump(primaryDbName, withClause); + + // Do the incremental load, and check everything is intact. + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use "+ replicatedDbName) + .run("select id from tablei1") + .verifyResults(new String[]{"1","2","3","4"}) + .run("select id from tablei2") + .verifyResults(new String[]{"10","20","30","40"}) + .run("select id from tablem1") + .verifyResults(new String[]{"5","10","15","20"}) + .run("select id from tablem2") + .verifyResults(new String[]{"6","12","18","24"}); + + // Do some modifications & call for the second cycle of incremental dump & load. + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create external table table1 (id int)") + .run("insert into table table1 values (25),(35),(82)") + .run("create table table1_managed (name string)") + .run("insert into table table1_managed values ('CAD'),('DAS'),('MSA')") + .run("insert into table tablei1 values(15),(62),(25),(62)") + .run("insert into table tablei2 values(10),(22),(11),(22)") + .run("insert into table tablem1 values(5),(10),(15),(20)") + .run("alter table table1 set TBLPROPERTIES('comment'='abc')") + .dump(primaryDbName, withClause); + + // Do an incremental load + replica.load(replicatedDbName, primaryDbName, withClause); + + // Get the latest notification from the notification log for the target database, just after replication. + CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId(); + + // Check the tables are there post incremental load. + replica.run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("select id from table1") + .verifyResults(new String[]{"25", "35", "82"}) + .run("select name from table1_managed") + .verifyResults(new String[]{"CAD", "DAS", "MSA"}) + .verifyReplTargetProperty(replicatedDbName); + + // Do some modifications on the source cluster, so we have some entries in the table diff. + primary.run("use " + primaryDbName) + .run("create table table2_managed (id string)") + .run("insert into table table1_managed values ('AAA'),('BBB')") + .run("insert into table table2_managed values ('A1'),('B1'),('C2')"); + + + // Do some modifications in another database to have unrelated events as well after the last load, which should + // get filtered. + + primary.run("create database " + extraPrimaryDb) + .run("use " + extraPrimaryDb) + .run("create external table table1 (id int)") + .run("insert into table table1 values (15),(1),(96)") + .run("create table table1_managed (id string)") + .run("insert into table table1_managed values ('SAA'),('PSA')"); + + // Do some modifications on the target database. + replica.run("use " + replicatedDbName) + .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('repl1'='value1')") + .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('repl2'='value2')"); + + // Validate the current replication id on original target has changed now. + assertNotEquals(replica.getCurrentNotificationEventId().getEventId(), notificationIdAfterRepl.getEventId()); + + // Prepare for reverse replication. + DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem(); + Path newReplDir = new Path(replica.repldDir + "reverse01"); + replicaFs.mkdirs(newReplDir); + withClause = ReplicationTestUtils.includeExternalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'"); + + tuple = replica.dump(replicatedDbName, withClause); + + // Check event ack file should get created. + assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist", + replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE))); + + // Get the target event id. + NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf) + .getNextNotification(Long.parseLong(getEventIdFromFile(new Path(tuple.dumpLocation), conf)[1]), 10, + new DatabaseAndTableFilter(replicatedDbName, null)); + + assertEquals(0, nl.getEventsSize()); + } + + @Test + public void testTargetEventIdWithNotificationsExpiredInOptimisedFailover() throws Throwable { + // Do a a cycle of bootstrap dump & load. + List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'"); + + // Do a bootstrap cycle(A->B) + primary.dump(primaryDbName, withClause); + replica.load(replicatedDbName, primaryDbName, withClause); + + // Add some table & do the first incremental dump. + primary.run("use " + primaryDbName) + .run("create external table tablei1 (id int)") + .run("create table tablem1 (id int)") + .run("insert into table tablei1 values(1),(2),(3),(4)") + .run("insert into table tablem1 values(5),(10),(15),(20)") + .dump(primaryDbName, withClause); + + // Do the incremental load, and check everything is intact. + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use "+ replicatedDbName) + .run("select id from tablei1") + .verifyResults(new String[]{"1","2","3","4"}) + .run("select id from tablem1") + .verifyResults(new String[]{"5","10","15","20"}); + + // Explicitly make the notification logs. + // Get the latest notification from the notification log for the target database, just after replication. + CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId(); + // Inject a behaviour where some events missing from notification_log table. + // This ensures the incremental dump doesn't get all events for replication. + InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse, NotificationEventResponse> + eventIdSkipper = + new InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse, NotificationEventResponse>() { + + @Nullable + @Override + public NotificationEventResponse apply(@Nullable NotificationEventResponse eventIdList) { + if (null != eventIdList) { + List<NotificationEvent> eventIds = eventIdList.getEvents(); + List<NotificationEvent> outEventIds = new ArrayList<>(); + for (NotificationEvent event : eventIds) { + // Skip the last db event. + if (event.getDbName().equalsIgnoreCase(replicatedDbName)) { + injectionPathCalled = true; + continue; + } + outEventIds.add(event); + } + + // Return the new list + return new NotificationEventResponse(outEventIds); + } else { + return null; + } + } + }; + + try { + InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(eventIdSkipper); + + // Prepare for reverse replication. + DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem(); + Path newReplDir = new Path(replica.repldDir + "reverse01"); + replicaFs.mkdirs(newReplDir); + withClause = ReplicationTestUtils.includeExternalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'"); + + try { + replica.dump(replicatedDbName, withClause); + fail("Expected the dump to fail since the notification event is missing."); + } catch (Exception e) { + // Expected due to missing notification log entry. + } + + // Check if there is a non-recoverable error or not. + Path nonRecoverablePath = + TestReplicationScenarios.getNonRecoverablePath(newReplDir, replicatedDbName, replica.hiveConf); + assertTrue(replicaFs.exists(nonRecoverablePath)); + } finally { + InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour + } + } + @Test public void testDistCpCopyWithRemoteStagingAndCopyTaskOnTarget() throws Throwable { List<String> withClauseOptions = getStagingLocationConfig(replica.repldDir, true); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index b76354eb459..667ede3ca74 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -1088,9 +1088,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { // of the ACID tables might be included for bootstrap during incremental dump. For old policy, its because the table // may not satisfying the old policy but satisfying the new policy. For filter, it may happen that the table // is renamed and started satisfying the policy. - return ((!work.replScope.includeAllTables()) - || (previousReplScopeModified()) - || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)); + return !work.replScope.includeAllTables() || previousReplScopeModified() || !tablesForBootstrap.isEmpty() + || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES); } private void dumpEvent(NotificationEvent ev, Path evRoot, Path dumpRoot, Path cmRoot, Hive db) throws Exception {