This is an automated email from the ASF dual-hosted git repository.
tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new b5e21aa61f4 HIVE-29459: [DR][HiveACIDReplication] Add
clearDanglingTxnTask at the end (#6334)
b5e21aa61f4 is described below
commit b5e21aa61f438546d4c14c2a4587f1bf2b938f5d
Author: Harshal Patel <[email protected]>
AuthorDate: Thu Mar 5 09:37:55 2026 +0530
HIVE-29459: [DR][HiveACIDReplication] Add clearDanglingTxnTask at the end
(#6334)
---
.../parse/TestReplicationScenariosAcidTables.java | 42 ++++++++++++++++++++++
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 9 +++--
2 files changed, 49 insertions(+), 2 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index d59f00d4dc1..c81a0d11899 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -436,6 +436,48 @@ public void
testRemoveDanglingTxnWithOpenTxnOnSourceAndDanglingTxnOnDR() throws
}
}
+ @Test
+ public void testClearDanglingTxnRunsOnlyAfterFinalIncrementalRound() throws
Throwable {
+ List<String> withClauseList = Arrays.asList(
+ "'" + HiveConf.ConfVars.HIVE_REPL_CLEAR_DANGLING_TXNS_ON_TARGET +
"'='true'",
+ "'" + HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS.varname +
"'='1'");
+ String insertStmt = "insert into sales_transactional partition(country)
values "
+ + "(102, 'Phone', 800.00, '2026-02-11 11:30:00', 'Canada'),"
+ + "(103, 'Tablet', 450.00, '2026-02-11 12:15:00', 'USA'),"
+ + "(104, 'Monitor', 300.00, '2026-02-11 14:00:00', 'UK')";
+
+ primary.run("use " + primaryDbName)
+ .run("create table sales_transactional (sale_id int, product_name
string, amount decimal(10,2), "
+ + "sale_date timestamp) partitioned by (country string)
stored as orc "
+ + "tblproperties (\"transactional\"=\"true\")")
+ .run(insertStmt)
+ .run(insertStmt);
+
+ primary.dump(primaryDbName, withClauseList);
+ replica.load(replicatedDbName, primaryDbName, withClauseList)
+ .run("use " + replicatedDbName)
+ .run("select count(*) from sales_transactional")
+ .verifyResult("6");
+
+ primary.run("use " + primaryDbName);
+ for (int i = 0; i < 12; i++) {
+ primary.run(insertStmt);
+ }
+ primary.run("truncate table sales_transactional");
+ for (int i = 0; i < 5; i++) {
+ primary.run(insertStmt);
+ }
+
+ WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName,
withClauseList);
+
+ replica.load(replicatedDbName, primaryDbName, withClauseList)
+ .run("use " + replicatedDbName)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(incrementalDump.lastReplicationId)
+ .run("select count(*) from sales_transactional")
+ .verifyResult("15");
+ }
+
private List<Long> getOpenTxnCountFromDump(FileSystem fs, Path
openTxnDumpPath) throws IOException {
List<Long> openTxnIds = new ArrayList<>();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 1e8a82b3a60..9287fd75e76 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -963,8 +963,11 @@ private int executeIncrementalLoad(long loadStartTime)
throws Exception {
((IncrementalLoadLogger)work.incrementalLoadTasksBuilder().getReplLogger()).initiateEventTimestamp(currentTimestamp);
LOG.info("REPL_INCREMENTAL_LOAD stage duration : {} ms", currentTimestamp
- loadStartTime);
- if
(conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_CLEAR_DANGLING_TXNS_ON_TARGET)) {
-
+ // Clear dangling transactions only once all incremental work for this
dump is exhausted.
+ // Running this in intermediate rounds can remove source->target txn
mappings that later
+ // rounds still depend on for write-id replay.
+ boolean hasPendingIncrementalWork = builder.hasMoreWork() ||
work.hasBootstrapLoadTasks();
+ if
(conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_CLEAR_DANGLING_TXNS_ON_TARGET) &&
!hasPendingIncrementalWork) {
ClearDanglingTxnWork clearDanglingTxnWork = new
ClearDanglingTxnWork(work.getDumpDirectory(), targetDb.getName());
Task<ClearDanglingTxnWork> clearDanglingTxnTaskTask =
TaskFactory.get(clearDanglingTxnWork, conf);
if (childTasks.isEmpty()) {
@@ -972,6 +975,8 @@ private int executeIncrementalLoad(long loadStartTime)
throws Exception {
} else {
DAGTraversal.traverse(childTasks, new
AddDependencyToLeaves(Collections.singletonList(clearDanglingTxnTaskTask)));
}
+ } else if
(conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_CLEAR_DANGLING_TXNS_ON_TARGET)) {
+ LOG.info("Skipping dangling transaction cleanup in this iteration as
incremental load has pending work.");
}
return 0;