This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 44046af1a76 HIVE-27899: Killed speculative execution task attempt
should not commit file (#4899)
44046af1a76 is described below
commit 44046af1a76c8b910d6dfc06cec2eb92e2ea7bc3
Author: zhengchenyu <[email protected]>
AuthorDate: Sat Sep 6 22:45:25 2025 +0800
HIVE-27899: Killed speculative execution task attempt should not commit
file (#4899)
---
.../apache/hadoop/hive/ql/exec/tez/TezProcessor.java | 20 +++++++++++++++-----
1 file changed, 15 insertions(+), 5 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index 7f305d2046d..7ed091bca2d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -137,7 +137,6 @@ void shutDownProgressTaskService() {
}
}
- protected ProcessorContext processorContext;
private ReflectiveProgressHelper progressHelper;
protected static final NumberFormat taskIdFormat =
NumberFormat.getInstance();
@@ -182,11 +181,11 @@ public void handleEvents(List<Event> arg0) {
@Override
public void initialize() throws IOException {
perfLogger.perfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
- Configuration conf =
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ ProcessorContext processorContext = getContext();
+ Configuration conf =
TezUtils.createConfFromUserPayload(processorContext.getUserPayload());
this.jobConf = new JobConf(conf);
this.jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
- this.processorContext = getContext();
- initTezAttributes();
+ initTezAttributes(processorContext);
ExecutionContext execCtx = processorContext.getExecutionContext();
if (execCtx instanceof Hook) {
((Hook)execCtx).initializeHook(this);
@@ -196,7 +195,7 @@ public void initialize() throws IOException {
}
- private void initTezAttributes() {
+ private void initTezAttributes(ProcessorContext processorContext) {
jobConf.set(HIVE_TEZ_VERTEX_NAME, processorContext.getTaskVertexName());
jobConf.setInt(HIVE_TEZ_VERTEX_INDEX,
processorContext.getTaskVertexIndex());
jobConf.setInt(HIVE_TEZ_TASK_INDEX, processorContext.getTaskIndex());
@@ -293,6 +292,17 @@ protected void initializeAndRunProcessor(Map<String,
LogicalInput> inputs,
rproc.init(mrReporter, inputs, outputs);
rproc.run();
+ // Try to call canCommit to AM. If there is no other speculative attempt
execute canCommit, then continue.
+ // If there are other speculative attempt execute canCommit first, then
wait until the attempt is killed
+ // or the committed task fails.
+ while (!getContext().canCommit()) {
+ // If canCommit returns false, and we enter this loop, it means
another task attempt has already committed.
+ // This task attempt only needs to sleep for a relatively long time
while waiting to be killed.
+ // However, we must avoid a low-probability scenario: a rare case
where a task attempt fails after committing.
+ // In that situation, the delay must not be excessively long so this
attempt can still react in time.
+ // 500ms is chosen as a trade-off value.
+ Thread.sleep(500);
+ }
} catch (Throwable t) {
rproc.setAborted(true);
originalThrowable = t;