This is an automated email from the ASF dual-hosted git repository. tchoi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 870713ce031 HIVE-26601: Registering table metric during second load cycle of optimized bootstrap (#3992) (Vinit Patni, reviewed by Teddy Choi) 870713ce031 is described below commit 870713ce031b346cdd9008a3217d8cc806ea9f7a Author: vinitpatni <vinitsun...@gmail.com> AuthorDate: Fri Feb 3 13:13:30 2023 +0530 HIVE-26601: Registering table metric during second load cycle of optimized bootstrap (#3992) (Vinit Patni, reviewed by Teddy Choi) --- .../parse/TestReplicationOptimisedBootstrap.java | 81 ++++++++++++++++++++++ .../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 6 +- .../incremental/IncrementalLoadTasksBuilder.java | 7 +- 3 files changed, 90 insertions(+), 4 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java index 4959bacf5ad..a55b7c8a5b4 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java @@ -992,6 +992,87 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationScenariosA assertEquals(tableMetric.getTotalCount(), tableDiffEntries.size()); } + @Test + public void testTblMetricRegisterDuringSecondLoadCycleOfOptimizedBootstrap() throws Throwable { + List<String> withClause = ReplicationTestUtils.includeExternalTableClause(false); + withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'"); + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table t1_managed (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into table t1_managed values (10)") + .run("insert into table t1_managed values (20),(31),(42)") + .dump(primaryDbName, withClause); + + // Do the bootstrap load and check all the external & managed tables are present. + replica.load(replicatedDbName, primaryDbName, withClause) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[]{"t1_managed"}) + .verifyReplTargetProperty(replicatedDbName); + + // Do an incremental dump & load, Add one table which we can drop & an empty table as well. + tuple = primary.run("use " + primaryDbName) + .run("create table t2_managed (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into table t2_managed values (10)") + .run("insert into table t2_managed values (20),(31),(42)") + .dump(primaryDbName, withClause); + + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[]{"t1_managed", "t2_managed"}) + .verifyReplTargetProperty(replicatedDbName); + + primary.run("use " + primaryDbName) + .run("insert into table t1_managed values (30)") + .run("insert into table t1_managed values (50),(51),(52)"); + + // Prepare for reverse replication. + DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem(); + Path newReplDir = new Path(replica.repldDir + "1"); + replicaFs.mkdirs(newReplDir); + withClause = ReplicationTestUtils.includeExternalTableClause(false); + withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'"); + + + // Do a reverse dump + tuple = replica.dump(replicatedDbName, withClause); + + // Check the event ack file got created. + assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist", + replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE))); + + + // Do a load, this should create a table_diff_complete directory + primary.load(primaryDbName,replicatedDbName, withClause); + + // Check the table diff directory exist. + assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist", + replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY))); + + Path dumpPath = new Path(tuple.dumpLocation); + // Check the table diff has all the modified table, including the dropped and empty ones + HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, conf); + assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries + .containsAll(Arrays.asList("t1_managed"))); + + isMetricsEnabledForTests(true); + replica.dump(replicatedDbName, withClause); + + //do a load on primary and verify insert queries are discarded + primary.load(primaryDbName,replicatedDbName, withClause) + .run("select id from t1_managed") + .verifyResults(new String[] { "10", "20", "31", "42" }); + MetricCollector collector = MetricCollector.getInstance(); + ReplicationMetric metric = collector.getMetrics().getLast(); + Stage stage = metric.getProgress().getStageByName("REPL_LOAD"); + Metric tableMetric = stage.getMetricByName(ReplUtils.MetricName.TABLES.name()); + assertEquals(tableMetric.getTotalCount(), tableDiffEntries.size()); + } + @NotNull private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable { List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 2c379472d3a..60f24862112 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -150,9 +150,6 @@ public class ReplLoadWork implements Serializable, ReplLoadWorkMBean { isFirstFailover = checkFileExists(dumpDirParent, hiveConf, EVENT_ACK_FILE); isSecondFailover = !isFirstFailover && checkFileExists(dumpDirParent, hiveConf, BOOTSTRAP_TABLES_LIST); - incrementalLoadTasksBuilder = new IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory, - new IncrementalLoadEventsIterator(dumpDirectory, hiveConf), hiveConf, eventTo, metricCollector, - replStatsTracker, shouldFailover); /* * If the current incremental dump also includes bootstrap for some tables, then create iterator @@ -186,6 +183,9 @@ public class ReplLoadWork implements Serializable, ReplLoadWorkMBean { this.bootstrapIterator = null; this.constraintsIterator = null; } + incrementalLoadTasksBuilder = new IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory, + new IncrementalLoadEventsIterator(dumpDirectory, hiveConf), hiveConf, eventTo, metricCollector, + replStatsTracker, shouldFailover, tablesToBootstrap.size()); } else { this.bootstrapIterator = new BootstrapEventsIterator(new Path(dumpDirectory, EximUtil.METADATA_PATH_NAME) .toString(), dbNameToLoadIn, true, hiveConf, metricCollector); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index c5f6d6ed2f2..31a53054028 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -82,7 +82,7 @@ public class IncrementalLoadTasksBuilder { public IncrementalLoadTasksBuilder(String dbName, String loadPath, IncrementalLoadEventsIterator iterator, HiveConf conf, Long eventTo, ReplicationMetricCollector metricCollector, ReplStatsTracker replStatsTracker, - boolean shouldFailover) throws SemanticException { + boolean shouldFailover, int bootstrapTableSize) throws SemanticException { this.dbName = dbName; dumpDirectory = (new Path(loadPath).getParent()).toString(); this.iterator = iterator; @@ -102,6 +102,11 @@ public class IncrementalLoadTasksBuilder { this.metricCollector.reportFailoverStart("REPL_LOAD", metricMap, new FailoverMetaData(new Path(dumpDirectory, ReplUtils.REPL_HIVE_BASE_DIR), conf)); } else { + //Registering table metric as we do boostrap of selective tables + // in second load cycle of optimized bootstrap + if(bootstrapTableSize > 0) { + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) bootstrapTableSize); + } this.metricCollector.reportStageStart("REPL_LOAD", metricMap); } }