This is an automated email from the ASF dual-hosted git repository.

tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new b5e21aa61f4 HIVE-29459: [DR][HiveACIDReplication] Add 
clearDanglingTxnTask at the end (#6334)
b5e21aa61f4 is described below

commit b5e21aa61f438546d4c14c2a4587f1bf2b938f5d
Author: Harshal Patel <[email protected]>
AuthorDate: Thu Mar 5 09:37:55 2026 +0530

    HIVE-29459: [DR][HiveACIDReplication] Add clearDanglingTxnTask at the end 
(#6334)
---
 .../parse/TestReplicationScenariosAcidTables.java  | 42 ++++++++++++++++++++++
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  9 +++--
 2 files changed, 49 insertions(+), 2 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index d59f00d4dc1..c81a0d11899 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -436,6 +436,48 @@ public void 
testRemoveDanglingTxnWithOpenTxnOnSourceAndDanglingTxnOnDR() throws
     }
   }
 
+  @Test
+  public void testClearDanglingTxnRunsOnlyAfterFinalIncrementalRound() throws 
Throwable {
+    List<String> withClauseList = Arrays.asList(
+            "'" + HiveConf.ConfVars.HIVE_REPL_CLEAR_DANGLING_TXNS_ON_TARGET + 
"'='true'",
+            "'" + HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS.varname + 
"'='1'");
+    String insertStmt = "insert into sales_transactional partition(country) 
values "
+            + "(102, 'Phone', 800.00, '2026-02-11 11:30:00', 'Canada'),"
+            + "(103, 'Tablet', 450.00, '2026-02-11 12:15:00', 'USA'),"
+            + "(104, 'Monitor', 300.00, '2026-02-11 14:00:00', 'UK')";
+
+    primary.run("use " + primaryDbName)
+            .run("create table sales_transactional (sale_id int, product_name 
string, amount decimal(10,2), "
+                    + "sale_date timestamp) partitioned by (country string) 
stored as orc "
+                    + "tblproperties (\"transactional\"=\"true\")")
+            .run(insertStmt)
+            .run(insertStmt);
+
+    primary.dump(primaryDbName, withClauseList);
+    replica.load(replicatedDbName, primaryDbName, withClauseList)
+            .run("use " + replicatedDbName)
+            .run("select count(*) from sales_transactional")
+            .verifyResult("6");
+
+    primary.run("use " + primaryDbName);
+    for (int i = 0; i < 12; i++) {
+      primary.run(insertStmt);
+    }
+    primary.run("truncate table sales_transactional");
+    for (int i = 0; i < 5; i++) {
+      primary.run(insertStmt);
+    }
+
+    WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, 
withClauseList);
+
+    replica.load(replicatedDbName, primaryDbName, withClauseList)
+            .run("use " + replicatedDbName)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(incrementalDump.lastReplicationId)
+            .run("select count(*) from sales_transactional")
+            .verifyResult("15");
+  }
+
 
   private List<Long> getOpenTxnCountFromDump(FileSystem fs, Path 
openTxnDumpPath) throws IOException {
     List<Long> openTxnIds = new ArrayList<>();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 1e8a82b3a60..9287fd75e76 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -963,8 +963,11 @@ private int executeIncrementalLoad(long loadStartTime) 
throws Exception {
     
((IncrementalLoadLogger)work.incrementalLoadTasksBuilder().getReplLogger()).initiateEventTimestamp(currentTimestamp);
     LOG.info("REPL_INCREMENTAL_LOAD stage duration : {} ms", currentTimestamp 
- loadStartTime);
 
-    if 
(conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_CLEAR_DANGLING_TXNS_ON_TARGET)) {
-
+    // Clear dangling transactions only once all incremental work for this 
dump is exhausted.
+    // Running this in intermediate rounds can remove source->target txn 
mappings that later
+    // rounds still depend on for write-id replay.
+    boolean hasPendingIncrementalWork = builder.hasMoreWork() || 
work.hasBootstrapLoadTasks();
+    if 
(conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_CLEAR_DANGLING_TXNS_ON_TARGET) && 
!hasPendingIncrementalWork) {
       ClearDanglingTxnWork clearDanglingTxnWork = new 
ClearDanglingTxnWork(work.getDumpDirectory(), targetDb.getName());
       Task<ClearDanglingTxnWork> clearDanglingTxnTaskTask = 
TaskFactory.get(clearDanglingTxnWork, conf);
       if (childTasks.isEmpty()) {
@@ -972,6 +975,8 @@ private int executeIncrementalLoad(long loadStartTime) 
throws Exception {
       } else {
         DAGTraversal.traverse(childTasks, new 
AddDependencyToLeaves(Collections.singletonList(clearDanglingTxnTaskTask)));
       }
+    } else if 
(conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_CLEAR_DANGLING_TXNS_ON_TARGET)) {
+      LOG.info("Skipping dangling transaction cleanup in this iteration as 
incremental load has pending work.");
     }
 
     return 0;

Reply via email to