This is an automated email from the ASF dual-hosted git repository.

tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 870713ce031 HIVE-26601: Registering table metric during second load 
cycle of optimized bootstrap (#3992) (Vinit Patni, reviewed by Teddy Choi)
870713ce031 is described below

commit 870713ce031b346cdd9008a3217d8cc806ea9f7a
Author: vinitpatni <vinitsun...@gmail.com>
AuthorDate: Fri Feb 3 13:13:30 2023 +0530

    HIVE-26601: Registering table metric during second load cycle of optimized 
bootstrap (#3992) (Vinit Patni, reviewed by Teddy Choi)
---
 .../parse/TestReplicationOptimisedBootstrap.java   | 81 ++++++++++++++++++++++
 .../hadoop/hive/ql/exec/repl/ReplLoadWork.java     |  6 +-
 .../incremental/IncrementalLoadTasksBuilder.java   |  7 +-
 3 files changed, 90 insertions(+), 4 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
index 4959bacf5ad..a55b7c8a5b4 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -992,6 +992,87 @@ public class TestReplicationOptimisedBootstrap extends 
BaseReplicationScenariosA
     assertEquals(tableMetric.getTotalCount(), tableDiffEntries.size());
   }
 
+  @Test
+  public void testTblMetricRegisterDuringSecondLoadCycleOfOptimizedBootstrap() 
throws Throwable {
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(false);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+            .run("create table t1_managed (id int) clustered by(id) into 3 
buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("insert into table t1_managed values (10)")
+            .run("insert into table t1_managed values (20),(31),(42)")
+            .dump(primaryDbName, withClause);
+
+    // Do the bootstrap load and check all the external & managed tables are 
present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1_managed"})
+            .verifyReplTargetProperty(replicatedDbName);
+
+    // Do an incremental dump & load, Add one table which we can drop & an 
empty table as well.
+    tuple = primary.run("use " + primaryDbName)
+            .run("create table t2_managed (id int) clustered by(id) into 3 
buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("insert into table t2_managed values (10)")
+            .run("insert into table t2_managed values (20),(31),(42)")
+            .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1_managed", "t2_managed"})
+            .verifyReplTargetProperty(replicatedDbName);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into table t1_managed values (30)")
+            .run("insert into table t1_managed values (50),(51),(52)");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(false);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " 
doesn't exist",
+            replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName,replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+            replicaFs.exists(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    Path dumpPath = new Path(tuple.dumpLocation);
+    // Check the table diff has all the modified table, including the dropped 
and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath, 
conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+            .containsAll(Arrays.asList("t1_managed")));
+
+    isMetricsEnabledForTests(true);
+    replica.dump(replicatedDbName, withClause);
+
+    //do a load on primary and verify insert queries are discarded
+    primary.load(primaryDbName,replicatedDbName, withClause)
+            .run("select id from t1_managed")
+            .verifyResults(new String[] { "10", "20", "31", "42" });
+    MetricCollector collector = MetricCollector.getInstance();
+    ReplicationMetric metric = collector.getMetrics().getLast();
+    Stage stage = metric.getProgress().getStageByName("REPL_LOAD");
+    Metric tableMetric = 
stage.getMetricByName(ReplUtils.MetricName.TABLES.name());
+    assertEquals(tableMetric.getTotalCount(), tableDiffEntries.size());
+  }
+
   @NotNull
   private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
     List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index 2c379472d3a..60f24862112 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -150,9 +150,6 @@ public class ReplLoadWork implements Serializable, 
ReplLoadWorkMBean {
       isFirstFailover = checkFileExists(dumpDirParent, hiveConf, 
EVENT_ACK_FILE);
       isSecondFailover =
           !isFirstFailover && checkFileExists(dumpDirParent, hiveConf, 
BOOTSTRAP_TABLES_LIST);
-      incrementalLoadTasksBuilder = new 
IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory,
-          new IncrementalLoadEventsIterator(dumpDirectory, hiveConf), 
hiveConf, eventTo, metricCollector,
-          replStatsTracker, shouldFailover);
 
       /*
        * If the current incremental dump also includes bootstrap for some 
tables, then create iterator
@@ -186,6 +183,9 @@ public class ReplLoadWork implements Serializable, 
ReplLoadWorkMBean {
         this.bootstrapIterator = null;
         this.constraintsIterator = null;
       }
+      incrementalLoadTasksBuilder = new 
IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory,
+              new IncrementalLoadEventsIterator(dumpDirectory, hiveConf), 
hiveConf, eventTo, metricCollector,
+              replStatsTracker, shouldFailover, tablesToBootstrap.size());
     } else {
       this.bootstrapIterator = new BootstrapEventsIterator(new 
Path(dumpDirectory, EximUtil.METADATA_PATH_NAME)
               .toString(), dbNameToLoadIn, true, hiveConf, metricCollector);
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index c5f6d6ed2f2..31a53054028 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -82,7 +82,7 @@ public class IncrementalLoadTasksBuilder {
 
   public IncrementalLoadTasksBuilder(String dbName, String loadPath, 
IncrementalLoadEventsIterator iterator,
       HiveConf conf, Long eventTo, ReplicationMetricCollector metricCollector, 
ReplStatsTracker replStatsTracker,
-                                     boolean shouldFailover) throws 
SemanticException {
+                                     boolean shouldFailover, int 
bootstrapTableSize) throws SemanticException {
     this.dbName = dbName;
     dumpDirectory = (new Path(loadPath).getParent()).toString();
     this.iterator = iterator;
@@ -102,6 +102,11 @@ public class IncrementalLoadTasksBuilder {
       this.metricCollector.reportFailoverStart("REPL_LOAD", metricMap,
               new FailoverMetaData(new Path(dumpDirectory, 
ReplUtils.REPL_HIVE_BASE_DIR), conf));
     } else {
+      //Registering table metric as we do boostrap of selective tables
+      // in second load cycle of optimized bootstrap
+      if(bootstrapTableSize > 0) {
+        metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) 
bootstrapTableSize);
+      }
       this.metricCollector.reportStageStart("REPL_LOAD", metricMap);
     }
   }

Reply via email to