This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new c62fa28ce26 HIVE-29124: Avoid committing files when a task is aborted 
even though some source has completed (#6011)
c62fa28ce26 is described below

commit c62fa28ce26fbcd789c06aafc82ab79630226a3d
Author: zhengchenyu <[email protected]>
AuthorDate: Thu Sep 4 22:52:12 2025 +0800

    HIVE-29124: Avoid committing files when a task is aborted even though some 
source has completed (#6011)
---
 .../hive/ql/exec/tez/ReduceRecordProcessor.java    | 23 +++++++++++++---------
 .../hive/ql/exec/tez/ReduceRecordSource.java       | 10 +++++++++-
 .../org/apache/hadoop/hive/ql/plan/BaseWork.java   |  8 ++++++++
 .../org/apache/hadoop/hive/ql/plan/ReduceWork.java |  8 ++++++++
 4 files changed, 39 insertions(+), 10 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index c4608298858..96714f29dc6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -305,6 +305,14 @@ public void abort() {
     } else {
       LOG.info("reducer not setup yet. abort not being forwarded");
     }
+    if (mergeWorkList != null) {
+      for (BaseWork redWork : mergeWorkList) {
+        redWork.abort();
+      }
+    }
+    if (reduceWork != null) {
+      reduceWork.abort();
+    }
   }
 
   /**
@@ -343,16 +351,13 @@ void close() {
     }
 
     try {
-      if (isAborted()) {
-        for (ReduceRecordSource rs : sources) {
-          if (!rs.close()) {
-            setAborted(false); // Preserving the old logic. Hmm...
-            break;
-          }
-        }
-      }
-
       boolean abort = isAborted();
+      for (ReduceRecordSource rs : sources) {
+        abort |= !rs.close();
+      }
+      if (abort) {
+        setAborted(true);
+      }
       reducer.close(abort);
       if (mergeWorkList != null) {
         for (BaseWork redWork : mergeWorkList) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index 5c30f2ed994..d44542c7aae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -520,6 +520,14 @@ private void processVectorGroup(BytesWritable keyWritable,
     }
   }
 
+
+  /**
+   * Closes resources and returns whether the records were successfully 
processed.
+   * @return boolean indicating the success status:
+   * - true: All data has been processed successfully without exceptions.
+   * - false: Exceptions were encountered during data processing.
+   * @throws Exception unexpected errors occur during closing.
+   */
   boolean close() throws Exception {
     try {
       if (handleGroupKey && groupKey != null) {
@@ -533,7 +541,7 @@ boolean close() throws Exception {
             + e.getMessage(), e);
       }
     }
-    return abort;
+    return !abort;
   }
 
   public ObjectInspector getObjectInspector() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
index 87e78444f96..201cf0300e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
@@ -545,4 +545,12 @@ public void setInputSourceToRuntimeValuesInfo(
           String workName, RuntimeValuesInfo runtimeValuesInfo) {
     inputSourceToRuntimeValuesInfo.put(workName, runtimeValuesInfo);
   }
+
+  public void abort() {
+    if (dummyOps != null) {
+      for (Operator<?> dummyOp : dummyOps) {
+        dummyOp.abort();
+      }
+    }
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
index 8e188ac2f61..6d554d83f89 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
@@ -381,4 +381,12 @@ public void setEdgePropRef(TezEdgeProperty edgeProp) {
   public TezEdgeProperty getEdgePropRef() {
     return edgeProp;
   }
+
+  @Override
+  public void abort() {
+    super.abort();
+    if (reducer != null) {
+      reducer.abort();
+    }
+  }
 }

Reply via email to