This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 84b42876441 HIVE-28962: Prevent committing outputs if an exception was
thrown in the Tez processor (#5824)
84b42876441 is described below
commit 84b42876441ec8984f1a39367ae77627cc96a275
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Tue May 27 22:04:38 2025 +0300
HIVE-28962: Prevent committing outputs if an exception was thrown in the
Tez processor (#5824)
---
.../hadoop/hive/ql/exec/tez/TezProcessor.java | 56 ++++++++++------------
1 file changed, 25 insertions(+), 31 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index e6fe37762a4..7f305d2046d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.io.function.IOConsumer;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.runtime.api.TaskFailureType;
@@ -65,7 +66,7 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
* This provides the ability to pass things into TezProcessor, which is
normally impossible
* because of how Tez APIs are structured. Piggyback on ExecutionContext.
*/
- public static interface Hook {
+ private interface Hook {
void initializeHook(TezProcessor source);
}
@@ -282,8 +283,8 @@ protected void initializeAndRunProcessor(Map<String,
LogicalInput> inputs,
Map<String, LogicalOutput> outputs)
throws Exception {
Throwable originalThrowable = null;
- try {
+ try {
MRTaskReporter mrReporter = new MRTaskReporter(getContext());
// Init and run are both potentially long, and blocking operations.
Synchronization
// with the 'abort' operation will not work since if they end up
blocking on a monitor
@@ -292,16 +293,16 @@ protected void initializeAndRunProcessor(Map<String,
LogicalInput> inputs,
rproc.init(mrReporter, inputs, outputs);
rproc.run();
- perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
} catch (Throwable t) {
rproc.setAborted(true);
originalThrowable = t;
+
} finally {
- if (originalThrowable != null && (originalThrowable instanceof Error ||
- Throwables.getRootCause(originalThrowable) instanceof Error)) {
+ if (originalThrowable != null && (originalThrowable instanceof Error ||
+ Throwables.getRootCause(originalThrowable) instanceof Error)) {
LOG.error("Cannot recover from this FATAL error", originalThrowable);
- getContext().reportFailure(TaskFailureType.FATAL, originalThrowable,
- "Cannot recover from this error");
+ getContext().reportFailure(TaskFailureType.FATAL, originalThrowable,
"Cannot recover from this error");
+
throw new RuntimeException(originalThrowable);
}
@@ -309,21 +310,10 @@ protected void initializeAndRunProcessor(Map<String,
LogicalInput> inputs,
if (rproc != null) {
rproc.close();
}
- } catch (Throwable t) {
if (originalThrowable == null) {
- originalThrowable = t;
- }
- }
+ closeOutputTasks(outputs, MROutput::commit);
- // commit the output tasks
- try {
- for (LogicalOutput output : outputs.values()) {
- if (output instanceof MROutput) {
- MROutput mrOutput = (MROutput) output;
- if (mrOutput.isCommitRequired()) {
- mrOutput.commit();
- }
- }
+ perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
}
} catch (Throwable t) {
if (originalThrowable == null) {
@@ -333,19 +323,23 @@ protected void initializeAndRunProcessor(Map<String,
LogicalInput> inputs,
if (originalThrowable != null) {
LOG.error("Failed initializeAndRunProcessor", originalThrowable);
- // abort the output tasks
- for (LogicalOutput output : outputs.values()) {
- if (output instanceof MROutput) {
- MROutput mrOutput = (MROutput) output;
- if (mrOutput.isCommitRequired()) {
- mrOutput.abort();
- }
- }
- }
+ closeOutputTasks(outputs, MROutput::abort);
+
if (originalThrowable instanceof InterruptedException) {
throw (InterruptedException) originalThrowable;
- } else {
- throw new RuntimeException(originalThrowable);
+ }
+ throw new RuntimeException(originalThrowable);
+ }
+ }
+ }
+
+ private static void closeOutputTasks(
+ Map<String, LogicalOutput> outputs, IOConsumer<MROutput> committer)
throws IOException {
+ for (LogicalOutput output : outputs.values()) {
+ if (output instanceof MROutput) {
+ MROutput mrOutput = (MROutput) output;
+ if (mrOutput.isCommitRequired()) {
+ committer.accept(mrOutput);
}
}
}