This is an automated email from the ASF dual-hosted git repository. ayushsaxena pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 86f94bf HIVE-25819: Track event id on target cluster with respect to source cluster. (#2890). (Ayush Saxena, reviewed by Pravin Kumar Sinha) 86f94bf is described below commit 86f94bf1df0db4b5e1c2a46d51d294c882ddb1f3 Author: Ayush Saxena <ayushsax...@apache.org> AuthorDate: Tue Feb 1 09:40:33 2022 +0530 HIVE-25819: Track event id on target cluster with respect to source cluster. (#2890). (Ayush Saxena, reviewed by Pravin Kumar Sinha) --- .../hive/hcatalog/api/repl/ReplicationUtils.java | 2 +- .../parse/TestReplicationOptimisedBootstrap.java | 284 +++++++++++++++++++++ .../hive/ql/parse/TestReplicationScenarios.java | 6 +- .../parse/TestReplicationScenariosAcidTables.java | 7 - .../hive/ql/exec/repl/OptimisedBootstrapUtils.java | 38 ++- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 8 +- .../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 6 +- .../ql/exec/repl/bootstrap/load/LoadDatabase.java | 9 +- .../incremental/IncrementalLoadTasksBuilder.java | 9 +- .../hive/ql/parse/ImportSemanticAnalyzer.java | 2 +- .../hive/ql/parse/ReplicationSemanticAnalyzer.java | 5 +- .../hadoop/hive/ql/parse/ReplicationSpec.java | 29 ++- .../hive/ql/parse/repl/dump/io/DBSerializer.java | 2 +- .../ql/parse/repl/dump/io/FunctionSerializer.java | 2 +- .../ql/parse/repl/dump/io/PartitionSerializer.java | 2 +- .../ql/parse/repl/dump/io/TableSerializer.java | 2 +- .../repl/load/message/AlterDatabaseHandler.java | 3 +- .../apache/hadoop/hive/common/repl/ReplConst.java | 5 + .../apache/hadoop/hive/metastore/HMSHandler.java | 28 ++ 19 files changed, 393 insertions(+), 56 deletions(-) diff --git a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java index 69e3c13..7514b1a 100644 --- a/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java +++ b/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/repl/ReplicationUtils.java @@ -41,7 +41,7 @@ import java.util.Map; public class ReplicationUtils { - public final static String REPL_STATE_ID = ReplicationSpec.KEY.CURR_STATE_ID.toString(); + public final static String REPL_STATE_ID = ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(); private ReplicationUtils(){ // dummy private constructor, since this class is a collection of static utility methods. diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java index d5b819d..a63bde6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java @@ -21,7 +21,13 @@ import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.security.UserGroupInformation; @@ -31,20 +37,25 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET; import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE; import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY; import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY; +import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile; import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile; import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -450,4 +461,277 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries.containsAll(Arrays.asList("t1_managed", "t2_managed"))); } + + @Test + public void testTargetEventIdGenerationAfterFirstIncremental() throws Throwable { + List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'"); + + // Do a bootstrap cycle(A->B) + primary.dump(primaryDbName, withClause); + replica.load(replicatedDbName, primaryDbName, withClause); + + // Add some table & do an incremental dump. + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create external table table1 (id int)") + .run("insert into table table1 values (100)") + .run("create table table1_managed (name string)") + .run("insert into table table1_managed values ('ABC')") + .dump(primaryDbName, withClause); + + // Do an incremental load + replica.load(replicatedDbName, primaryDbName, withClause); + + // Get the latest notification from the notification log for the target database, just after replication. + CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId(); + + // Check the tables are there post incremental load. + replica.run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("select id from table1") + .verifyResult("100") + .run("select name from table1_managed") + .verifyResult("ABC") + .verifyReplTargetProperty(replicatedDbName); + + // Do some modifications on the source cluster, so we have some entries in the table diff. + primary.run("use " + primaryDbName) + .run("create table table2_managed (id string)") + .run("insert into table table1_managed values ('SDC')") + .run("insert into table table2_managed values ('A'),('B'),('C')"); + + + // Do some modifications in another database to have unrelated events as well after the last load, which should + // get filtered. + + primary.run("create database " + extraPrimaryDb) + .run("use " + extraPrimaryDb) + .run("create external table t1 (id int)") + .run("insert into table t1 values (15),(1),(96)") + .run("create table t1_managed (id string)") + .run("insert into table t1_managed values ('SA'),('PS')"); + + // Do some modifications on the target database. + replica.run("use " + replicatedDbName) + .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('key1'='value1')") + .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('key2'='value2')"); + + // Validate the current replication id on original target has changed now. + assertNotEquals(replica.getCurrentNotificationEventId().getEventId(), notificationIdAfterRepl.getEventId()); + + // Prepare for reverse replication. + DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem(); + Path newReplDir = new Path(replica.repldDir + "reverse1"); + replicaFs.mkdirs(newReplDir); + withClause = ReplicationTestUtils.includeExternalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'"); + + tuple = replica.dump(replicatedDbName); + + // Check event ack file should get created. + assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist", + replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE))); + + // Get the target event id. + NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf) + .getNextNotification(Long.parseLong(getEventIdFromFile(new Path(tuple.dumpLocation), conf)[1]), -1, + new DatabaseAndTableFilter(replicatedDbName, null)); + + // There should be 4 events, one for alter db, second to remove first incremental pending and then two custom + // alter operations. + assertEquals(4, nl.getEvents().size()); + } + + @Test + public void testTargetEventIdGeneration() throws Throwable { + // Do a a cycle of bootstrap dump & load. + List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'"); + + // Do a bootstrap cycle(A->B) + primary.dump(primaryDbName, withClause); + replica.load(replicatedDbName, primaryDbName, withClause); + + // Add some table & do the first incremental dump. + primary.run("use " + primaryDbName) + .run("create external table tablei1 (id int)") + .run("create external table tablei2 (id int)") + .run("create table tablem1 (id int)") + .run("create table tablem2 (id int)") + .run("insert into table tablei1 values(1),(2),(3),(4)") + .run("insert into table tablei2 values(10),(20),(30),(40)") + .run("insert into table tablem1 values(5),(10),(15),(20)") + .run("insert into table tablem2 values(6),(12),(18),(24)") + .dump(primaryDbName, withClause); + + // Do the incremental load, and check everything is intact. + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use "+ replicatedDbName) + .run("select id from tablei1") + .verifyResults(new String[]{"1","2","3","4"}) + .run("select id from tablei2") + .verifyResults(new String[]{"10","20","30","40"}) + .run("select id from tablem1") + .verifyResults(new String[]{"5","10","15","20"}) + .run("select id from tablem2") + .verifyResults(new String[]{"6","12","18","24"}); + + // Do some modifications & call for the second cycle of incremental dump & load. + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create external table table1 (id int)") + .run("insert into table table1 values (25),(35),(82)") + .run("create table table1_managed (name string)") + .run("insert into table table1_managed values ('CAD'),('DAS'),('MSA')") + .run("insert into table tablei1 values(15),(62),(25),(62)") + .run("insert into table tablei2 values(10),(22),(11),(22)") + .run("insert into table tablem1 values(5),(10),(15),(20)") + .run("alter table table1 set TBLPROPERTIES('comment'='abc')") + .dump(primaryDbName, withClause); + + // Do an incremental load + replica.load(replicatedDbName, primaryDbName, withClause); + + // Get the latest notification from the notification log for the target database, just after replication. + CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId(); + + // Check the tables are there post incremental load. + replica.run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("select id from table1") + .verifyResults(new String[]{"25", "35", "82"}) + .run("select name from table1_managed") + .verifyResults(new String[]{"CAD", "DAS", "MSA"}) + .verifyReplTargetProperty(replicatedDbName); + + // Do some modifications on the source cluster, so we have some entries in the table diff. + primary.run("use " + primaryDbName) + .run("create table table2_managed (id string)") + .run("insert into table table1_managed values ('AAA'),('BBB')") + .run("insert into table table2_managed values ('A1'),('B1'),('C2')"); + + + // Do some modifications in another database to have unrelated events as well after the last load, which should + // get filtered. + + primary.run("create database " + extraPrimaryDb) + .run("use " + extraPrimaryDb) + .run("create external table table1 (id int)") + .run("insert into table table1 values (15),(1),(96)") + .run("create table table1_managed (id string)") + .run("insert into table table1_managed values ('SAA'),('PSA')"); + + // Do some modifications on the target database. + replica.run("use " + replicatedDbName) + .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('repl1'='value1')") + .run("alter database "+ replicatedDbName + " set DBPROPERTIES ('repl2'='value2')"); + + // Validate the current replication id on original target has changed now. + assertNotEquals(replica.getCurrentNotificationEventId().getEventId(), notificationIdAfterRepl.getEventId()); + + // Prepare for reverse replication. + DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem(); + Path newReplDir = new Path(replica.repldDir + "reverse01"); + replicaFs.mkdirs(newReplDir); + withClause = ReplicationTestUtils.includeExternalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'"); + + tuple = replica.dump(replicatedDbName, withClause); + + // Check event ack file should get created. + assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist", + replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE))); + + // Get the target event id. + NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf) + .getNextNotification(Long.parseLong(getEventIdFromFile(new Path(tuple.dumpLocation), conf)[1]), 10, + new DatabaseAndTableFilter(replicatedDbName, null)); + + assertEquals(1, nl.getEvents().size()); + } + + @Test + public void testTargetEventIdWithNotificationsExpired() throws Throwable { + // Do a a cycle of bootstrap dump & load. + List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'"); + + // Do a bootstrap cycle(A->B) + primary.dump(primaryDbName, withClause); + replica.load(replicatedDbName, primaryDbName, withClause); + + // Add some table & do the first incremental dump. + primary.run("use " + primaryDbName) + .run("create external table tablei1 (id int)") + .run("create table tablem1 (id int)") + .run("insert into table tablei1 values(1),(2),(3),(4)") + .run("insert into table tablem1 values(5),(10),(15),(20)") + .dump(primaryDbName, withClause); + + // Do the incremental load, and check everything is intact. + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use "+ replicatedDbName) + .run("select id from tablei1") + .verifyResults(new String[]{"1","2","3","4"}) + .run("select id from tablem1") + .verifyResults(new String[]{"5","10","15","20"}); + + // Explicitly make the notification logs. + // Get the latest notification from the notification log for the target database, just after replication. + CurrentNotificationEventId notificationIdAfterRepl = replica.getCurrentNotificationEventId(); + // Inject a behaviour where some events missing from notification_log table. + // This ensures the incremental dump doesn't get all events for replication. + InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse, NotificationEventResponse> + eventIdSkipper = + new InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse, NotificationEventResponse>() { + + @Nullable + @Override + public NotificationEventResponse apply(@Nullable NotificationEventResponse eventIdList) { + if (null != eventIdList) { + List<NotificationEvent> eventIds = eventIdList.getEvents(); + List<NotificationEvent> outEventIds = new ArrayList<>(); + for (NotificationEvent event : eventIds) { + // Skip the last db event. + if (event.getDbName().equalsIgnoreCase(replicatedDbName)) { + injectionPathCalled = true; + continue; + } + outEventIds.add(event); + } + + // Return the new list + return new NotificationEventResponse(outEventIds); + } else { + return null; + } + } + }; + + try { + InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(eventIdSkipper); + + // Prepare for reverse replication. + DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem(); + Path newReplDir = new Path(replica.repldDir + "reverse01"); + replicaFs.mkdirs(newReplDir); + withClause = ReplicationTestUtils.includeExternalTableClause(true); + withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'"); + + try { + replica.dump(replicatedDbName, withClause); + fail("Expected the dump to fail since the notification event is missing."); + } catch (Exception e) { + // Expected due to missing notification log entry. + } + + // Check if there is a non-recoverable error or not. + Path nonRecoverablePath = + TestReplicationScenarios.getNonRecoverablePath(newReplDir, replicatedDbName, replica.hiveConf); + assertTrue(replicaFs.exists(nonRecoverablePath)); + } finally { + InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // reset the behaviour + } + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 61f61a8..b91191e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -81,7 +81,6 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.StringAppender; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.apache.hadoop.hive.ql.parse.repl.load.metric.BootstrapLoadMetricCollector; @@ -132,9 +131,6 @@ import java.util.Base64; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; - import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL; @@ -4884,7 +4880,7 @@ public class TestReplicationScenarios { assertTrue(Long.parseLong(lastReplDumpId) > Long.parseLong(lastDbReplDumpId)); Table tbl = metaStoreClientMirror.getTable(replDbName, tblName); - String tblLastReplId = tbl.getParameters().get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + String tblLastReplId = tbl.getParameters().get(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()); assertTrue(Long.parseLong(tblLastReplId) > Long.parseLong(lastDbReplDumpId)); assertTrue(Long.parseLong(tblLastReplId) <= Long.parseLong(lastReplDumpId)); return lastReplDumpId; diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index b3ddbf5..2612cbf 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -934,13 +934,6 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios assert currentEventId == lastEventId + 1; primary.run("ALTER DATABASE " + primaryDbName + - " SET DBPROPERTIES('" + ReplConst.TARGET_OF_REPLICATION + "'='true')"); - lastEventId = primary.getCurrentNotificationEventId().getEventId(); - primary.dumpFailure(primaryDbName); - currentEventId = primary.getCurrentNotificationEventId().getEventId(); - assert lastEventId == currentEventId; - - primary.run("ALTER DATABASE " + primaryDbName + " SET DBPROPERTIES('" + ReplConst.TARGET_OF_REPLICATION + "'='')"); primary.dump(primaryDbName); replica.run("DROP DATABASE " + replicatedDbName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java index 85bbbec..8221a51 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java @@ -50,6 +50,7 @@ import java.util.List; import java.util.stream.Collectors; import static org.apache.hadoop.hive.ql.parse.ReplicationSpec.getLastReplicatedStateFromParameters; +import static org.apache.hadoop.hive.ql.parse.ReplicationSpec.getTargetLastReplicatedStateFromParameters; /** * Utility class for handling operations regarding optimised bootstrap in case of replication. @@ -58,7 +59,7 @@ public class OptimisedBootstrapUtils { /** Separator used to separate entries in the listing. */ public static final String FILE_ENTRY_SEPARATOR = "#"; - private static Logger LOG = LoggerFactory.getLogger(OptimisedBootstrapUtils.class); + private static final Logger LOG = LoggerFactory.getLogger(OptimisedBootstrapUtils.class); /** table diff directory when in progress */ public static final String TABLE_DIFF_INPROGRESS_DIRECTORY = "table_diff"; @@ -87,20 +88,20 @@ public class OptimisedBootstrapUtils { } /** - * Gets the event id from the event ack file + * Gets the source & target event id from the event ack file * @param dumpPath the dump path * @param conf the hive configuration * @return the event id from file. * @throws IOException */ - public static String getEventIdFromFile(Path dumpPath, HiveConf conf) throws IOException { + public static String[] getEventIdFromFile(Path dumpPath, HiveConf conf) throws IOException { String lastEventId; Path eventAckFilePath = new Path(dumpPath, EVENT_ACK_FILE); FileSystem fs = eventAckFilePath.getFileSystem(conf); try (FSDataInputStream stream = fs.open(eventAckFilePath);) { lastEventId = IOUtils.toString(stream, Charset.defaultCharset()); } - return lastEventId.replaceAll(System.lineSeparator(),"").trim(); + return lastEventId.replaceAll(System.lineSeparator(), "").trim().split(FILE_ENTRY_SEPARATOR); } /** @@ -176,19 +177,20 @@ public class OptimisedBootstrapUtils { * @param dmd the dump metadata * @param cmRoot the cmRoot * @param dbEventId the database event id to which we have to write in the file. - * @param conf the hive configuraiton + * @param targetDbEventId the database event id with respect to target cluster. + * @param conf the hive configuration * @param work the repldump work * @return the lastReplId denoting a fake dump(-1) always * @throws SemanticException */ public static Long createAndGetEventAckFile(Path currentDumpPath, DumpMetaData dmd, Path cmRoot, String dbEventId, - HiveConf conf, ReplDumpWork work) - throws SemanticException { + String targetDbEventId, HiveConf conf, ReplDumpWork work) throws Exception { // Keep an invalid value for lastReplId, to denote it isn't a actual dump. Long lastReplId = -1L; Path filePath = new Path(currentDumpPath, EVENT_ACK_FILE); - Utils.writeOutput(dbEventId, filePath, conf); - LOG.info("Created event_ack file at {} with eventId {}", filePath, dbEventId); + Utils.writeOutput(dbEventId + FILE_ENTRY_SEPARATOR + targetDbEventId, filePath, conf); + LOG.info("Created event_ack file at {} with source eventId {} and target eventId {}", filePath, dbEventId, + targetDbEventId); work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, -1L, false); dmd.write(true); @@ -245,6 +247,24 @@ public class OptimisedBootstrapUtils { LOG.info("Completed renaming table diff progress file to table diff complete file."); } + /** + * Fetches the notification id from the database with respect to target database. + * @param dbName name of database + * @param hiveDb the hive object + * @return the corresponding notification event id from target database + * @throws Exception + */ + public static String getTargetEventId(String dbName, Hive hiveDb) throws Exception { + Database database = hiveDb.getDatabase(dbName); + String targetLastEventId = getTargetLastReplicatedStateFromParameters(database.getParameters()); + List<NotificationEvent> events = + hiveDb.getMSC().getNextNotification(Long.parseLong(targetLastEventId) - 1, 1, null).getEvents(); + if (events == null || events.isEmpty() || events.get(0).getEventId() != Long.parseLong(targetLastEventId)) { + throw new IllegalStateException("Notification events are missing in the meta store."); + } + return targetLastEventId; + } + private static ArrayList<String> getListing(String dbName, String tableName, Hive hiveDb, HiveConf conf) throws HiveException, IOException { ArrayList<String> paths = new ArrayList<>(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index a9d728d..1fcce45 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -131,6 +131,7 @@ import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.checkF import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.createAndGetEventAckFile; import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile; import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getReplEventIdFromDatabase; +import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTargetEventId; import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.isFailover; import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.isFirstIncrementalPending; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; @@ -229,11 +230,14 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { isFirstIncrementalPending(work.dbNameOrPattern, getHive()); // Get the last replicated event id from the database. String dbEventId = getReplEventIdFromDatabase(work.dbNameOrPattern, getHive()); + // Get the last replicated event id from the database with respect to target. + String targetDbEventId = getTargetEventId(work.dbNameOrPattern, getHive()); // Check if the tableDiff directory is present or not. boolean isTableDiffDirectoryPresent = checkFileExists(currentDumpPath, conf, TABLE_DIFF_COMPLETE_DIRECTORY); if (createEventMarker) { LOG.info("Creating event_ack file for database {} with event id {}.", work.dbNameOrPattern, dbEventId); - lastReplId = createAndGetEventAckFile(currentDumpPath, dmd, cmRoot, dbEventId, conf, work); + lastReplId = + createAndGetEventAckFile(currentDumpPath, dmd, cmRoot, dbEventId, targetDbEventId, conf, work); finishRemainingTasks(); } else { // We should be here only if TableDiff is Present. @@ -548,7 +552,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return true; } // Event_ACK file is present check if it contains correct value or not. - String fileEventId = getEventIdFromFile(previousDumpPath.getParent(), conf); + String fileEventId = getEventIdFromFile(previousDumpPath.getParent(), conf)[0]; String dbEventId = getReplEventIdFromDatabase(work.dbNameOrPattern, getHive()).trim(); if (!dbEventId.equalsIgnoreCase(fileEventId)) { // In case the database event id changed post table_diff_complete generation, that means both forward & diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 3a2dda9..2f249aa7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -598,7 +598,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { Map<String, String> dbProps; if (work.isIncrementalLoad()) { dbProps = new HashMap<>(); - dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), + dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(), work.incrementalLoadTasksBuilder().eventTo().toString()); } else { Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn); @@ -699,7 +699,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { } boolean isTableDiffPresent = checkFileExists(new Path(work.dumpDirectory).getParent(), conf, TABLE_DIFF_COMPLETE_DIRECTORY); - Long eventId = Long.parseLong(getEventIdFromFile(new Path(work.dumpDirectory).getParent(), conf)); + Long eventId = Long.parseLong(getEventIdFromFile(new Path(work.dumpDirectory).getParent(), conf)[0]); if (!isTableDiffPresent) { prepareTableDiffFile(eventId, getHive(), work, conf); if (this.childTasks == null) { @@ -761,7 +761,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { if (StringUtils.isNotBlank(dbName)) { String lastEventid = builder.eventTo().toString(); Map<String, String> mapProp = new HashMap<>(); - mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), lastEventid); + mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(), lastEventid); AlterDatabaseSetPropertiesDesc alterDbDesc = new AlterDatabaseSetPropertiesDesc(dbName, mapProp, new ReplicationSpec(lastEventid, lastEventid)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java index ba7979d..06264f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java @@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; public class LoadDatabase { @@ -141,8 +140,8 @@ public class LoadDatabase { private boolean isDbAlreadyBootstrapped(Database db) { Map<String, String> props = db.getParameters(); return ((props != null) - && props.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()) - && !props.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()).isEmpty()); + && props.containsKey(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()) + && !props.get(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()).isEmpty()); } private boolean isDbEmpty(String dbName) throws HiveException { @@ -181,7 +180,9 @@ public class LoadDatabase { last repl id is set and we create a AlterDatabaseTask at the end of processing a database. */ Map<String, String> parameters = new HashMap<>(dbObj.getParameters()); - parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()); + + parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID_TARGET.toString()); parameters.remove(ReplUtils.REPL_IS_CUSTOM_DB_LOC); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index 0eabc1c..c9f0da4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.slf4j.Logger; -import org.stringtemplate.v4.ST; import java.util.ArrayList; import java.util.HashMap; @@ -177,7 +176,7 @@ public class IncrementalLoadTasksBuilder { taskChainTail = updateIncPendTask; Map<String, String> dbProps = new HashMap<>(); - dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(lastReplayedEvent)); + dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(), String.valueOf(lastReplayedEvent)); ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps, dumpDirectory, metricCollector, shouldFailover); Task<?> barrierTask = TaskFactory.get(replStateLogWork, conf); @@ -196,8 +195,8 @@ public class IncrementalLoadTasksBuilder { } private boolean isEventNotReplayed(Map<String, String> params, FileStatus dir, DumpType dumpType) { - if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) { - String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()))) { + String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()); if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) { log.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName()) + " is already replayed. LastReplId - " + Long.parseLong(replLastId)); @@ -242,7 +241,7 @@ public class IncrementalLoadTasksBuilder { private Task<?> dbUpdateReplStateTask(String dbName, String replState, Task<?> preCursor) { HashMap<String, String> mapProp = new HashMap<>(); - mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); + mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(), replState); AlterDatabaseSetPropertiesDesc alterDbDesc = new AlterDatabaseSetPropertiesDesc(dbName, mapProp, new ReplicationSpec(replState, replState)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 18cdab5..816153e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -841,7 +841,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } else { // verify if table has been the target of replication, and if so, check HiveConf if we're allowed // to override. If not, fail. - if (table.getParameters().containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()) + if (table.getParameters().containsKey(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()) && conf.getBoolVar(HiveConf.ConfVars.HIVE_EXIM_RESTRICT_IMPORTS_INTO_REPLICATED_TABLES)){ throw new SemanticException(ErrorMsg.IMPORT_INTO_STRICT_REPL_TABLE.getMsg( "Table "+table.getTableName()+" has repl.last.id parameter set." )); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 0b9eb57..17472c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -55,7 +55,6 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; -import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPLACE; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_DUMP; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_LOAD; @@ -445,8 +444,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { if (database != null) { inputs.add(new ReadEntity(database)); Map<String, String> params = database.getParameters(); - if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) { - return params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()))) { + return params.get(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()); } } } catch (HiveException e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 37039c4..dd2224c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -18,14 +18,13 @@ package org.apache.hadoop.hive.ql.parse; import com.google.common.base.Function; -import com.google.common.base.Predicate; import org.apache.hadoop.hive.common.repl.ReplConst; -import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.plan.PlanUtils; -import javax.annotation.Nullable; import java.util.Map; +import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_TARGET_DATABASE_PROPERTY; + /** * Statements executed to handle replication have some additional * information relevant to the replication subsystem - this class @@ -59,11 +58,12 @@ public class ReplicationSpec { public enum KEY { REPL_SCOPE("repl.scope"), EVENT_ID("repl.event.id"), - CURR_STATE_ID(ReplConst.REPL_TARGET_TABLE_PROPERTY), + CURR_STATE_ID_SOURCE(ReplConst.REPL_TARGET_TABLE_PROPERTY), NOOP("repl.noop"), IS_REPLACE("repl.is.replace"), VALID_WRITEID_LIST("repl.valid.writeid.list"), - VALID_TXN_LIST("repl.valid.txnid.list") + VALID_TXN_LIST("repl.valid.txnid.list"), + CURR_STATE_ID_TARGET(REPL_TARGET_DATABASE_PROPERTY), ; private final String keyName; @@ -146,7 +146,7 @@ public class ReplicationSpec { } } this.eventId = keyFetcher.apply(ReplicationSpec.KEY.EVENT_ID.toString()); - this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()); this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString())); this.isReplace = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_REPLACE.toString())); this.validWriteIdList = keyFetcher.apply(ReplicationSpec.KEY.VALID_WRITEID_LIST.toString()); @@ -230,8 +230,15 @@ public class ReplicationSpec { } public static String getLastReplicatedStateFromParameters(Map<String, String> m) { - if ((m != null) && (m.containsKey(KEY.CURR_STATE_ID.toString()))){ - return m.get(KEY.CURR_STATE_ID.toString()); + if ((m != null) && (m.containsKey(KEY.CURR_STATE_ID_SOURCE.toString()))){ + return m.get(KEY.CURR_STATE_ID_SOURCE.toString()); + } + return null; + } + + public static String getTargetLastReplicatedStateFromParameters(Map<String, String> m) { + if ((m != null) && (m.containsKey(KEY.CURR_STATE_ID_TARGET.toString()))){ + return m.get(KEY.CURR_STATE_ID_TARGET.toString()); } return null; } @@ -360,7 +367,7 @@ public class ReplicationSpec { } case EVENT_ID: return getReplicationState(); - case CURR_STATE_ID: + case CURR_STATE_ID_SOURCE: return getCurrentReplicationState(); case NOOP: return String.valueOf(isNoop()); @@ -388,9 +395,9 @@ public class ReplicationSpec { public static void copyLastReplId(Map<String, String> srcParameter, Map<String, String> destParameter) { - String lastReplId = srcParameter.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + String lastReplId = srcParameter.get(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()); if (lastReplId != null) { - destParameter.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), lastReplId); + destParameter.put(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(), lastReplId); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java index 06b8fd5..429b9d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/DBSerializer.java @@ -39,7 +39,7 @@ public class DBSerializer implements JsonWriter.Serializer { public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) throws SemanticException, IOException { dbObject.putToParameters( - ReplicationSpec.KEY.CURR_STATE_ID.toString(), + ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(), additionalPropertiesProvider.getCurrentReplicationState() ); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java index c340df7..fc01906 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java @@ -94,7 +94,7 @@ public class FunctionSerializer implements JsonWriter.Serializer { //This is required otherwise correct work object on repl load wont be created. writer.jsonGenerator.writeStringField(ReplicationSpec.KEY.REPL_SCOPE.toString(), "all"); - writer.jsonGenerator.writeStringField(ReplicationSpec.KEY.CURR_STATE_ID.toString(), + writer.jsonGenerator.writeStringField(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(), additionalPropertiesProvider.getCurrentReplicationState()); writer.jsonGenerator .writeStringField(FIELD_NAME, serializer.toString(copyObj)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java index 12c183e..d198612 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java @@ -57,7 +57,7 @@ public class PartitionSerializer implements JsonWriter.Serializer { if (additionalPropertiesProvider.getReplSpecType() != ReplicationSpec.Type.INCREMENTAL_DUMP) { partition.putToParameters( - ReplicationSpec.KEY.CURR_STATE_ID.toString(), + ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(), additionalPropertiesProvider.getCurrentReplicationState()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index 16c371a..8c52194 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -86,7 +86,7 @@ public class TableSerializer implements JsonWriter.Serializer { if (additionalPropertiesProvider.getReplSpecType() != ReplicationSpec.Type.INCREMENTAL_DUMP) { table.putToParameters( - ReplicationSpec.KEY.CURR_STATE_ID.toString(), + ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString(), additionalPropertiesProvider.getCurrentReplicationState()); } } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java index 65841c3..2194f39 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java @@ -64,7 +64,8 @@ public class AlterDatabaseHandler extends AbstractMessageHandler { String key = entry.getKey(); // Ignore the keys which are local to source warehouse if (key.startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX) - || key.equals(ReplicationSpec.KEY.CURR_STATE_ID.toString()) + || key.equals(ReplicationSpec.KEY.CURR_STATE_ID_SOURCE.toString()) + || key.equals(ReplicationSpec.KEY.CURR_STATE_ID_TARGET.toString()) || key.equals(ReplUtils.REPL_CHECKPOINT_KEY) || key.equals(ReplChangeManager.SOURCE_OF_REPLICATION) || key.equals(ReplUtils.REPL_FIRST_INC_PENDING_FLAG) diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java index 46dbc3e..f5131ca 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java @@ -53,4 +53,9 @@ public class ReplConst { public static final String TARGET_OF_REPLICATION = "repl.target.for"; public static final String REPL_INCOMPATIBLE = "repl.incompatible"; + + /** + * Tracks the event id with respect to the target cluster. + */ + public static final String REPL_TARGET_DATABASE_PROPERTY = "repl.target.last.id"; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java index d186c4f..d8a3da4 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java @@ -110,6 +110,7 @@ import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_PATH_SUFFI import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE; import static org.apache.hadoop.hive.common.AcidConstants.DELTA_DIGITS; +import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_TARGET_DATABASE_PROPERTY; import static org.apache.hadoop.hive.metastore.HiveMetaStoreClient.TRUNCATE_SKIP_DATA_DELETION; import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.TABLE_IS_CTAS; import static org.apache.hadoop.hive.metastore.ExceptionHandler.handleException; @@ -1581,6 +1582,16 @@ public class HMSHandler extends FacebookBase implements IHMSHandler { throw new MetaException("Could not alter database \"" + parsedDbName[DB_NAME] + "\". Could not retrieve old definition."); } + + // Add replication target event id. + if (isReplicationEventIdUpdate(oldDB, newDB)) { + Map<String, String> oldParams = new LinkedHashMap<>(newDB.getParameters()); + String currentNotificationLogID = Long.toString(ms.getCurrentNotificationEventId().getEventId()); + oldParams.put(REPL_TARGET_DATABASE_PROPERTY, currentNotificationLogID); + LOG.debug("Adding the {} property for database {} with event id {}", REPL_TARGET_DATABASE_PROPERTY, + newDB.getName(), currentNotificationLogID); + newDB.setParameters(oldParams); + } firePreEvent(new PreAlterDatabaseEvent(oldDB, newDB, this)); ms.openTransaction(); @@ -1613,6 +1624,23 @@ public class HMSHandler extends FacebookBase implements IHMSHandler { } } + /** + * Checks whether the repl.last.id is being updated. + * @param oldDb the old db object + * @param newDb the new db object + * @return true if repl.last.id is being changed. + */ + private boolean isReplicationEventIdUpdate(Database oldDb, Database newDb) { + Map<String, String> oldDbProp = oldDb.getParameters(); + Map<String, String> newDbProp = newDb.getParameters(); + if (newDbProp == null || newDbProp.isEmpty()) { + return false; + } + String newReplId = newDbProp.get(ReplConst.REPL_TARGET_TABLE_PROPERTY); + String oldReplId = oldDbProp != null ? oldDbProp.get(ReplConst.REPL_TARGET_TABLE_PROPERTY) : null; + return newReplId != null && !newReplId.equalsIgnoreCase(oldReplId); + } + private void drop_database_core(RawStore ms, String catName, final String name, final boolean deleteData, final boolean cascade) throws NoSuchObjectException, InvalidOperationException, MetaException,