This is an automated email from the ASF dual-hosted git repository. tchoi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 11f7ebbcad5 HIVE-26961: Fix improper replication metric count when open transactions are filtered. (#4041) (Rakshith Chandraiah, reviewed by Teddy Choi) 11f7ebbcad5 is described below commit 11f7ebbcad590fe569ce8f8588f667a6274d657b Author: Rakshith C <56068841+rakshith...@users.noreply.github.com> AuthorDate: Wed Feb 8 13:36:42 2023 +0530 HIVE-26961: Fix improper replication metric count when open transactions are filtered. (#4041) (Rakshith Chandraiah, reviewed by Teddy Choi) --- .../parse/TestReplicationScenariosAcidTables.java | 61 ++++++++++++++++++++++ .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 6 ++- .../ql/parse/repl/dump/events/EventHandler.java | 5 ++ 3 files changed, 70 insertions(+), 2 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index fda11c127e4..0dfb07f2282 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -55,6 +55,10 @@ import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.apache.hadoop.hive.ql.parse.repl.load.FailoverMetaData; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.parse.repl.metric.MetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Metric; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric; +import org.apache.hadoop.hive.ql.parse.repl.metric.event.Stage; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.Utils; @@ -92,6 +96,7 @@ import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_ENABLE_BACKGROUN import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; +import static org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector.isMetricsEnabledForTests; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -3736,4 +3741,60 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios ReplDumpWork.testDeletePreviousDumpMetaPath(false); } + @Test + public void testEventsDumpedCountWithFilteringOfOpenTransactions() throws Throwable { + final int REPL_MAX_LOAD_TASKS = 5; + List<String> incrementalBatchConfigs = Arrays.asList( + String.format("'%s'='%s'", HiveConf.ConfVars.REPL_BATCH_INCREMENTAL_EVENTS, "true"), + String.format("'%s'='%d'", HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS, REPL_MAX_LOAD_TASKS), + String.format("'%s'='%s'", HiveConf.ConfVars.REPL_FILTER_TRANSACTIONS, "true") + ); + + WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into table t1 values (1)") + .dump(primaryDbName, incrementalBatchConfigs); + + FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf); + + replica.load(replicatedDbName, primaryDbName, incrementalBatchConfigs) + .run("use " + replicatedDbName) + .run("select * from t1") + .verifyResults(new String[]{"1"}); + + isMetricsEnabledForTests(true); + MetricCollector collector = MetricCollector.getInstance(); + //incremental run + WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName) + .run("insert into t1 values(2)") + .run("insert into t1 values(3)") + .run("select * from t1") // will open a read only transaction which should be filtered. + .run("insert into t1 values(4)") + .run("insert into t1 values(5)") + .dump(primaryDbName, incrementalBatchConfigs); + + ReplicationMetric metric = collector.getMetrics().getLast(); + Stage stage = metric.getProgress().getStageByName("REPL_DUMP"); + Metric eventMetric = stage.getMetricByName(ReplUtils.MetricName.EVENTS.name()); + long eventCountFromMetrics = eventMetric.getTotalCount(); + + Path dumpPath = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path ackLastEventID = new Path(dumpPath, ReplAck.EVENTS_DUMP.toString()); + EventsDumpMetadata eventsDumpMetadata = EventsDumpMetadata.deserialize(ackLastEventID, conf); + + int eventsCountInAckFile = eventsDumpMetadata.getEventsDumpedCount(), eventCountFromStagingDir = 0; + + String eventsBatchDirPrefix = ReplUtils.INC_EVENTS_BATCH.replaceAll("%d", ""); + List<FileStatus> batchFiles = Arrays.stream(fs.listStatus(dumpPath)) + .filter(fileStatus -> fileStatus.getPath().getName() + .startsWith(eventsBatchDirPrefix)).collect(Collectors.toList()); + + for (FileStatus fileStatus : batchFiles) { + eventCountFromStagingDir += fs.listStatus(fileStatus.getPath()).length; + } + // open transactions were filtered. + assertTrue(eventCountFromStagingDir < eventCountFromMetrics); + // ensure event count is captured appropriately in EventsDumpMetadata. + assertEquals(eventsCountInAckFile, eventCountFromStagingDir); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 3e3ae8e18d6..e0b58a64493 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -1254,8 +1254,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { ); EventHandler eventHandler = EventHandlerFactory.handlerFor(ev); eventHandler.handle(context); - eventsDumpMetadata.incrementEventsDumpedCount(); - work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.EVENTS.name(), 1); + if (context.isDmdCreated()) { + eventsDumpMetadata.incrementEventsDumpedCount(); + work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.EVENTS.name(), 1); + } work.getReplLogger().eventLog(String.valueOf(ev.getEventId()), eventHandler.dumpType().toString()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java index ae70298ce2b..077ee3ed5ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java @@ -45,6 +45,7 @@ public interface EventHandler { final ReplScope replScope; final ReplScope oldReplScope; private Set<String> tablesForBootstrap; + private boolean dmdCreated; public Context(Path eventRoot, Path dumpRoot, Path cmRoot, Hive db, HiveConf hiveConf, ReplicationSpec replicationSpec, ReplScope replScope, ReplScope oldReplScope, @@ -77,6 +78,7 @@ public interface EventHandler { } DumpMetaData createDmd(EventHandler eventHandler) { + this.dmdCreated = true; return new DumpMetaData( eventRoot, eventHandler.dumpType(), @@ -99,5 +101,8 @@ public interface EventHandler { assert tableName != null; return tablesForBootstrap.remove(tableName.toLowerCase()); } + public boolean isDmdCreated() { + return dmdCreated; + } } }