This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 84b42876441 HIVE-28962: Prevent committing outputs if an exception was 
thrown in the Tez processor (#5824)
84b42876441 is described below

commit 84b42876441ec8984f1a39367ae77627cc96a275
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Tue May 27 22:04:38 2025 +0300

    HIVE-28962: Prevent committing outputs if an exception was thrown in the 
Tez processor (#5824)
---
 .../hadoop/hive/ql/exec/tez/TezProcessor.java      | 56 ++++++++++------------
 1 file changed, 25 insertions(+), 31 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index e6fe37762a4..7f305d2046d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -25,6 +25,7 @@
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.io.function.IOConsumer;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.runtime.api.TaskFailureType;
@@ -65,7 +66,7 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
    * This provides the ability to pass things into TezProcessor, which is 
normally impossible
    * because of how Tez APIs are structured. Piggyback on ExecutionContext.
    */
-  public static interface Hook {
+  private interface Hook {
     void initializeHook(TezProcessor source);
   }
 
@@ -282,8 +283,8 @@ protected void initializeAndRunProcessor(Map<String, 
LogicalInput> inputs,
       Map<String, LogicalOutput> outputs)
       throws Exception {
     Throwable originalThrowable = null;
-    try {
 
+    try {
       MRTaskReporter mrReporter = new MRTaskReporter(getContext());
       // Init and run are both potentially long, and blocking operations. 
Synchronization
       // with the 'abort' operation will not work since if they end up 
blocking on a monitor
@@ -292,16 +293,16 @@ protected void initializeAndRunProcessor(Map<String, 
LogicalInput> inputs,
       rproc.init(mrReporter, inputs, outputs);
       rproc.run();
 
-      perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
     } catch (Throwable t) {
       rproc.setAborted(true);
       originalThrowable = t;
+
     } finally {
-      if (originalThrowable != null && (originalThrowable instanceof Error ||
-        Throwables.getRootCause(originalThrowable) instanceof Error)) {
+      if (originalThrowable != null && (originalThrowable instanceof Error || 
+          Throwables.getRootCause(originalThrowable) instanceof Error)) {
         LOG.error("Cannot recover from this FATAL error", originalThrowable);
-        getContext().reportFailure(TaskFailureType.FATAL, originalThrowable,
-                      "Cannot recover from this error");
+        getContext().reportFailure(TaskFailureType.FATAL, originalThrowable, 
"Cannot recover from this error");
+
         throw new RuntimeException(originalThrowable);
       }
 
@@ -309,21 +310,10 @@ protected void initializeAndRunProcessor(Map<String, 
LogicalInput> inputs,
         if (rproc != null) {
           rproc.close();
         }
-      } catch (Throwable t) {
         if (originalThrowable == null) {
-          originalThrowable = t;
-        }
-      }
+          closeOutputTasks(outputs, MROutput::commit);
 
-      // commit the output tasks
-      try {
-        for (LogicalOutput output : outputs.values()) {
-          if (output instanceof MROutput) {
-            MROutput mrOutput = (MROutput) output;
-            if (mrOutput.isCommitRequired()) {
-              mrOutput.commit();
-            }
-          }
+          perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
         }
       } catch (Throwable t) {
         if (originalThrowable == null) {
@@ -333,19 +323,23 @@ protected void initializeAndRunProcessor(Map<String, 
LogicalInput> inputs,
 
       if (originalThrowable != null) {
         LOG.error("Failed initializeAndRunProcessor", originalThrowable);
-        // abort the output tasks
-        for (LogicalOutput output : outputs.values()) {
-          if (output instanceof MROutput) {
-            MROutput mrOutput = (MROutput) output;
-            if (mrOutput.isCommitRequired()) {
-              mrOutput.abort();
-            }
-          }
-        }
+        closeOutputTasks(outputs, MROutput::abort);
+
         if (originalThrowable instanceof InterruptedException) {
           throw (InterruptedException) originalThrowable;
-        } else {
-          throw new RuntimeException(originalThrowable);
+        }
+        throw new RuntimeException(originalThrowable);
+      }
+    }
+  }
+
+  private static void closeOutputTasks(
+      Map<String, LogicalOutput> outputs, IOConsumer<MROutput> committer) 
throws IOException {
+    for (LogicalOutput output : outputs.values()) {
+      if (output instanceof MROutput) {
+        MROutput mrOutput = (MROutput) output;
+        if (mrOutput.isCommitRequired()) {
+          committer.accept(mrOutput);
         }
       }
     }

Reply via email to