This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new bd8e4052066 HIVE-26285: Overwrite database metadata on original source in optimised failover. (Haymant Mangla reviewed by Denys Kuzmenko and Peter Vary) (#3346) bd8e4052066 is described below commit bd8e4052066e0ea9294defd6d4e87094c667b846 Author: Haymant Mangla <79496857+hmangl...@users.noreply.github.com> AuthorDate: Thu Jun 9 13:11:05 2022 +0530 HIVE-26285: Overwrite database metadata on original source in optimised failover. (Haymant Mangla reviewed by Denys Kuzmenko and Peter Vary) (#3346) --- .../parse/TestReplicationOptimisedBootstrap.java | 13 +++++- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 5 +-- .../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 50 +++++++++++++++++++--- 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java index 5ccd74f3708..5bd6ac3d362 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java @@ -753,11 +753,13 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst // Do a reverse second dump, this should do a bootstrap dump for the tables in the table_diff and incremental for // rest. + + assertTrue("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1"))); WarehouseInstance.Tuple tuple = replica.dump(replicatedDbName, withClause); String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; // _bootstrap directory should be created as bootstrap enabled on external tables. - Path dumpPath1 = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME +"/metadata/" + replicatedDbName); + Path dumpPath1 = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME +"/" + EximUtil.METADATA_PATH_NAME +"/" + replicatedDbName); FileStatus[] listStatus = dumpPath1.getFileSystem(conf).listStatus(dumpPath1); ArrayList<String> tablesBootstrapped = new ArrayList<String>(); for (FileStatus file : listStatus) { @@ -769,6 +771,8 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst // Do a reverse load, this should do a bootstrap load for the tables in table_diff and incremental for the rest. primary.load(primaryDbName, replicatedDbName, withClause); + assertFalse("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1"))); + primary.run("use " + primaryDbName) .run("select id from t1") .verifyResults(new String[] { "1", "2", "3", "4", "101", "210", "321" }) @@ -898,6 +902,8 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst // Check the properties on the new target database. assertTrue(targetParams.containsKey(TARGET_OF_REPLICATION)); + assertTrue(targetParams.containsKey(CURR_STATE_ID_TARGET.toString())); + assertTrue(targetParams.containsKey(CURR_STATE_ID_SOURCE.toString())); assertFalse(targetParams.containsKey(SOURCE_OF_REPLICATION)); // Check the properties on the new source database. @@ -1096,7 +1102,10 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationAcrossInst // Do some modifications on original source cluster. The diff becomes(tnew_managed, t1, t2, t3) primary.run("use " + primaryDbName).run("create table tnew_managed (id int)") .run("insert into table t1 values (25)").run("insert into table tnew_managed values (110)") - .run("insert into table t2 partition(country='france') values ('lyon')").run("drop table t3"); + .run("insert into table t2 partition(country='france') values ('lyon')").run("drop table t3") + .run("alter database "+ primaryDbName + " set DBPROPERTIES ('key1'='value1')"); + + assertTrue("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1"))); // Do some modifications on the target cluster. (t1, t2, t3: bootstrap & t4, t5: incremental) replica.run("use " + replicatedDbName).run("insert into table t1 values (101)") diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index bc141943131..b76354eb459 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -245,9 +245,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { String dbEventId = getReplEventIdFromDatabase(work.dbNameOrPattern, getHive()); // Get the last replicated event id from the database with respect to target. String targetDbEventId = getTargetEventId(work.dbNameOrPattern, getHive()); - // Check if the tableDiff directory is present or not. - boolean isTableDiffDirectoryPresent = - checkFileExists(currentDumpPath, conf, TABLE_DIFF_COMPLETE_DIRECTORY); LOG.info("Creating event_ack file for database {} with event id {}.", work.dbNameOrPattern, dbEventId); lastReplId = @@ -274,6 +271,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { // Generate the bootstrapped table list and put it in the new dump directory for the load to consume. createBootstrapTableList(currentDumpPath, tablesForBootstrap, conf); + dumpDbMetadata(work.dbNameOrPattern, new Path(hiveDumpRoot, EximUtil.METADATA_PATH_NAME), + fromEventId, getHive()); // Call the normal dump with the tablesForBootstrap set. lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, getHive()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 096fe24face..2ef04e2a306 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -21,6 +21,8 @@ import org.apache.hadoop.hive.common.repl.ReplConst; import org.apache.hadoop.fs.Options; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.ddl.database.alter.owner.AlterDatabaseSetOwnerDesc; +import org.apache.hadoop.hive.ql.ddl.privilege.PrincipalDesc; import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils; import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger; import org.apache.thrift.TException; @@ -722,15 +724,15 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { } boolean isTableDiffPresent = checkFileExists(new Path(work.dumpDirectory).getParent(), conf, TABLE_DIFF_COMPLETE_DIRECTORY); - Long eventId = Long.parseLong(getEventIdFromFile(new Path(work.dumpDirectory).getParent(), conf)[0]); if (!isTableDiffPresent) { + Long eventId = Long.parseLong(getEventIdFromFile(new Path(work.dumpDirectory).getParent(), conf)[0]); prepareTableDiffFile(eventId, getHive(), work, conf); - if (this.childTasks == null) { - this.childTasks = new ArrayList<>(); - } - createReplLoadCompleteAckTask(); - return 0; } + if (this.childTasks == null) { + this.childTasks = new ArrayList<>(); + } + createReplLoadCompleteAckTask(); + return 0; } else if (work.isSecondFailover) { // DROP the tables extra on target, which are not on source cluster. @@ -739,6 +741,27 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { LOG.info("Dropping table {} for optimised bootstarap", work.dbNameToLoadIn + "." + table); db.dropTable(work.dbNameToLoadIn + "." + table, true); } + Database sourceDb = getSourceDbMetadata(); //This sourceDb was the actual target prior to failover. + Map<String, String> sourceDbProps = sourceDb.getParameters(); + Map<String, String> targetDbProps = new HashMap<>(targetDb.getParameters()); + for (String key : MetaStoreUtils.getReplicationDbProps()) { + //Replication Props will be handled separately as part of preAckTask. + targetDbProps.remove(key); + } + for (Map.Entry<String, String> currProp : targetDbProps.entrySet()) { + String actualVal = sourceDbProps.get(currProp.getKey()); + if (!currProp.getValue().equals(actualVal)) { + props.put(currProp.getKey(), (actualVal == null) ? "" : actualVal); + } + } + AlterDatabaseSetOwnerDesc alterDbDesc = new AlterDatabaseSetOwnerDesc(sourceDb.getName(), + new PrincipalDesc(sourceDb.getOwnerName(), sourceDb.getOwnerType()), null); + DDLWork ddlWork = new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc, true, + (new Path(work.dumpDirectory)).getParent().toString(), work.getMetricCollector()); + if (this.childTasks == null) { + this.childTasks = new ArrayList<>(); + } + this.childTasks.add(TaskFactory.get(ddlWork, conf)); } if (!MetaStoreUtils.isTargetOfReplication(targetDb)) { props.put(ReplConst.TARGET_OF_REPLICATION, ReplConst.TRUE); @@ -825,4 +848,19 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { LOG.info("REPL_INCREMENTAL_LOAD stage duration : {} ms", currentTimestamp - loadStartTime); return 0; } + + private Database getSourceDbMetadata() throws IOException, SemanticException { + Path dbMetadata = new Path(work.dumpDirectory, EximUtil.METADATA_PATH_NAME); + BootstrapEventsIterator itr = new BootstrapEventsIterator(dbMetadata.toString(), work.dbNameToLoadIn, + true, conf, work.getMetricCollector()); + if (!itr.hasNext()) { + throw new SemanticException("Unable to find source db metadata in " + dbMetadata.toString()); + } + BootstrapEvent next = itr.next(); + if (!next.eventType().equals(BootstrapEvent.EventType.Database)) { + throw new SemanticException("Invalid eventType: " + next.eventType() + " encountered while fetching " + + "source db metadata from " + dbMetadata.toString()); + } + return ((DatabaseEvent) next).dbInMetadata(work.dbNameToLoadIn); + } }