This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new c62fa28ce26 HIVE-29124: Avoid committing files when a task is aborted
even though some source has completed (#6011)
c62fa28ce26 is described below
commit c62fa28ce26fbcd789c06aafc82ab79630226a3d
Author: zhengchenyu <[email protected]>
AuthorDate: Thu Sep 4 22:52:12 2025 +0800
HIVE-29124: Avoid committing files when a task is aborted even though some
source has completed (#6011)
---
.../hive/ql/exec/tez/ReduceRecordProcessor.java | 23 +++++++++++++---------
.../hive/ql/exec/tez/ReduceRecordSource.java | 10 +++++++++-
.../org/apache/hadoop/hive/ql/plan/BaseWork.java | 8 ++++++++
.../org/apache/hadoop/hive/ql/plan/ReduceWork.java | 8 ++++++++
4 files changed, 39 insertions(+), 10 deletions(-)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index c4608298858..96714f29dc6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -305,6 +305,14 @@ public void abort() {
} else {
LOG.info("reducer not setup yet. abort not being forwarded");
}
+ if (mergeWorkList != null) {
+ for (BaseWork redWork : mergeWorkList) {
+ redWork.abort();
+ }
+ }
+ if (reduceWork != null) {
+ reduceWork.abort();
+ }
}
/**
@@ -343,16 +351,13 @@ void close() {
}
try {
- if (isAborted()) {
- for (ReduceRecordSource rs : sources) {
- if (!rs.close()) {
- setAborted(false); // Preserving the old logic. Hmm...
- break;
- }
- }
- }
-
boolean abort = isAborted();
+ for (ReduceRecordSource rs : sources) {
+ abort |= !rs.close();
+ }
+ if (abort) {
+ setAborted(true);
+ }
reducer.close(abort);
if (mergeWorkList != null) {
for (BaseWork redWork : mergeWorkList) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index 5c30f2ed994..d44542c7aae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -520,6 +520,14 @@ private void processVectorGroup(BytesWritable keyWritable,
}
}
+
+ /**
+ * Closes resources and returns whether the records were successfully
processed.
+ * @return boolean indicating the success status:
+ * - true: All data has been processed successfully without exceptions.
+ * - false: Exceptions were encountered during data processing.
+ * @throws Exception unexpected errors occur during closing.
+ */
boolean close() throws Exception {
try {
if (handleGroupKey && groupKey != null) {
@@ -533,7 +541,7 @@ boolean close() throws Exception {
+ e.getMessage(), e);
}
}
- return abort;
+ return !abort;
}
public ObjectInspector getObjectInspector() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
index 87e78444f96..201cf0300e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
@@ -545,4 +545,12 @@ public void setInputSourceToRuntimeValuesInfo(
String workName, RuntimeValuesInfo runtimeValuesInfo) {
inputSourceToRuntimeValuesInfo.put(workName, runtimeValuesInfo);
}
+
+ public void abort() {
+ if (dummyOps != null) {
+ for (Operator<?> dummyOp : dummyOps) {
+ dummyOp.abort();
+ }
+ }
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
index 8e188ac2f61..6d554d83f89 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
@@ -381,4 +381,12 @@ public void setEdgePropRef(TezEdgeProperty edgeProp) {
public TezEdgeProperty getEdgePropRef() {
return edgeProp;
}
+
+ @Override
+ public void abort() {
+ super.abort();
+ if (reducer != null) {
+ reducer.abort();
+ }
+ }
}