This is an automated email from the ASF dual-hosted git repository. tchoi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new afd2722310c HIVE-26599: Registering Tables metric during second cycle of optimised bootstrap (Vinit Patni, reviewed by Teddy Choi) afd2722310c is described below commit afd2722310c712b504dff74082f9865c31d5a187 Author: vinitpatni <vinitsun...@gmail.com> AuthorDate: Fri Jan 27 13:06:15 2023 +0530 HIVE-26599: Registering Tables metric during second cycle of optimised bootstrap (Vinit Patni, reviewed by Teddy Choi) --- .../parse/TestReplicationOptimisedBootstrap.java | 82 ++++++++++++++++++++++ .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 4 ++ 2 files changed, 86 insertions(+) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java index 396abd24b47..42ef25756ae 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java @@ -32,11 +32,17 @@ import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncod import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.parse.repl.metric.MetricCollector; import org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric; +import static org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector.isMetricsEnabledForTests; import org.jetbrains.annotations.NotNull; import org.junit.After; @@ -906,6 +912,82 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationScenariosA .verifyFailure(new String[]{"tnew_managed"}); } + @Test + public void testTblMetricRegisterDuringSecondCycleOfOptimizedBootstrap() throws Throwable { + List<String> withClause = ReplicationTestUtils.includeExternalTableClause(false); + withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + primary.repldDir + "'"); + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table t1_managed (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into table t1_managed values (10)") + .run("insert into table t1_managed values (20),(31),(42)") + .dump(primaryDbName, withClause); + + // Do the bootstrap load and check all the external & managed tables are present. + replica.load(replicatedDbName, primaryDbName, withClause) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[]{"t1_managed"}) + .verifyReplTargetProperty(replicatedDbName); + + // Do an incremental dump & load, Add one table which we can drop & an empty table as well. + tuple = primary.run("use " + primaryDbName) + .run("create table t2_managed (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into table t2_managed values (10)") + .run("insert into table t2_managed values (20),(31),(42)") + .dump(primaryDbName, withClause); + + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[]{"t1_managed", "t2_managed"}) + .verifyReplTargetProperty(replicatedDbName); + + primary.run("use " + primaryDbName) + .run("insert into table t1_managed values (30)") + .run("insert into table t1_managed values (50),(51),(52)"); + + // Prepare for reverse replication. + DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem(); + Path newReplDir = new Path(replica.repldDir + "1"); + replicaFs.mkdirs(newReplDir); + withClause = ReplicationTestUtils.includeExternalTableClause(false); + withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + newReplDir + "'"); + + + // Do a reverse dump + tuple = replica.dump(replicatedDbName, withClause); + + // Check the event ack file got created. + assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " doesn't exist", + replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE))); + + + // Do a load, this should create a table_diff_complete directory + primary.load(primaryDbName,replicatedDbName, withClause); + + // Check the table diff directory exist. + assertTrue(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist", + replicaFs.exists(new Path(tuple.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY))); + + Path dumpPath = new Path(tuple.dumpLocation); + // Check the table diff has all the modified table, including the dropped and empty ones + HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, conf); + assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries + .containsAll(Arrays.asList("t1_managed"))); + + isMetricsEnabledForTests(true); + replica.dump(replicatedDbName, withClause); + MetricCollector collector = MetricCollector.getInstance(); + ReplicationMetric metric = collector.getMetrics().getLast(); + Stage stage = metric.getProgress().getStageByName("REPL_DUMP"); + Metric tableMetric = stage.getMetricByName(ReplUtils.MetricName.TABLES.name()); + assertEquals(tableMetric.getTotalCount(), tableDiffEntries.size()); + } + @NotNull private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable { List<String> withClause = ReplicationTestUtils.includeExternalTableClause(true); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 50146725def..2cda6b30b59 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -867,6 +867,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { if (conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_FAILOVER_START)) { work.getMetricCollector().reportFailoverStart(getName(), metricMap, work.getFailoverMetadata()); } else { + int size = tablesForBootstrap.size(); + if (size > 0) { + metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) tablesForBootstrap.size()); + } work.getMetricCollector().reportStageStart(getName(), metricMap); } long dumpedCount = resumeFrom - work.eventFrom;