This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 44046af1a76 HIVE-27899: Killed speculative execution task attempt 
should not commit file (#4899)
44046af1a76 is described below

commit 44046af1a76c8b910d6dfc06cec2eb92e2ea7bc3
Author: zhengchenyu <[email protected]>
AuthorDate: Sat Sep 6 22:45:25 2025 +0800

    HIVE-27899: Killed speculative execution task attempt should not commit 
file (#4899)
---
 .../apache/hadoop/hive/ql/exec/tez/TezProcessor.java | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index 7f305d2046d..7ed091bca2d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -137,7 +137,6 @@ void shutDownProgressTaskService() {
     }
   }
 
-  protected ProcessorContext processorContext;
   private ReflectiveProgressHelper progressHelper;
 
   protected static final NumberFormat taskIdFormat = 
NumberFormat.getInstance();
@@ -182,11 +181,11 @@ public void handleEvents(List<Event> arg0) {
   @Override
   public void initialize() throws IOException {
     perfLogger.perfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
-    Configuration conf = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+    ProcessorContext processorContext = getContext();
+    Configuration conf = 
TezUtils.createConfFromUserPayload(processorContext.getUserPayload());
     this.jobConf = new JobConf(conf);
     
this.jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
-    this.processorContext = getContext();
-    initTezAttributes();
+    initTezAttributes(processorContext);
     ExecutionContext execCtx = processorContext.getExecutionContext();
     if (execCtx instanceof Hook) {
       ((Hook)execCtx).initializeHook(this);
@@ -196,7 +195,7 @@ public void initialize() throws IOException {
   }
 
 
-  private void initTezAttributes() {
+  private void initTezAttributes(ProcessorContext processorContext) {
     jobConf.set(HIVE_TEZ_VERTEX_NAME, processorContext.getTaskVertexName());
     jobConf.setInt(HIVE_TEZ_VERTEX_INDEX, 
processorContext.getTaskVertexIndex());
     jobConf.setInt(HIVE_TEZ_TASK_INDEX, processorContext.getTaskIndex());
@@ -293,6 +292,17 @@ protected void initializeAndRunProcessor(Map<String, 
LogicalInput> inputs,
       rproc.init(mrReporter, inputs, outputs);
       rproc.run();
 
+      // Try to call canCommit to AM. If there is no other speculative attempt 
execute canCommit, then continue.
+      // If there are other speculative attempt execute canCommit first, then 
wait until the attempt is killed
+      // or the committed task fails.
+      while (!getContext().canCommit()) {
+        // If canCommit returns false, and we enter this loop, it means 
another task attempt has already committed.
+        // This task attempt only needs to sleep for a relatively long time 
while waiting to be killed.
+        // However, we must avoid a low-probability scenario: a rare case 
where a task attempt fails after committing.
+        // In that situation, the delay must not be excessively long so this 
attempt can still react in time.
+        // 500ms is chosen as a trade-off value.
+        Thread.sleep(500);
+      }
     } catch (Throwable t) {
       rproc.setAborted(true);
       originalThrowable = t;

Reply via email to