This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fdde02b5 [#2591] fix(client): Missing task_id propagation in 
getLocalShuffleDataV3 (#2605)
9fdde02b5 is described below

commit 9fdde02b51a04176e29508a39583e0021547f3a0
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Sep 11 17:43:37 2025 +0800

    [#2591] fix(client): Missing task_id propagation in getLocalShuffleDataV3 
(#2605)
    
    ### What changes were proposed in this pull request?
    
    Fix missing task_id propagation in getLocalShuffleDataV3
    
    ### Why are the changes needed?
    
    Task_id is invalid
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Internal tests.
---
 .../java/org/apache/spark/shuffle/reader/RssShuffleReader.java     | 7 +++----
 .../java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java | 1 +
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index f03aa9955..93584e124 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -86,6 +86,7 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
   private ShuffleDependency<K, ?, C> shuffleDependency;
   private int numMaps;
   private Serializer serializer;
+  private long taskAttemptId;
   private String taskId;
   private String basePath;
   private int partitionNum;
@@ -134,6 +135,7 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
     this.shuffleId = shuffleDependency.shuffleId();
     this.serializer = rssShuffleHandle.getDependency().serializer();
     this.taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber();
+    this.taskAttemptId = context.taskAttemptId();
     this.basePath = basePath;
     this.partitionNum = partitionNum;
     this.partitionToExpectBlocks = partitionToExpectBlocks;
@@ -317,10 +319,7 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
                 .retryMax(retryMax)
                 .retryIntervalMax(retryIntervalMax)
                 .rssConf(rssConf)
-                .taskAttemptId(
-                    Optional.ofNullable(TaskContext.get())
-                        .map(taskContext -> taskContext.taskAttemptId())
-                        .orElse(0L));
+                .taskAttemptId(taskAttemptId);
         if (codec.isPresent() && 
rssConf.get(RSS_READ_OVERLAPPING_DECOMPRESSION_ENABLED)) {
           builder
               .overlappingDecompressionEnabled(true)
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 8586d3493..dcde0ca3e 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -199,6 +199,7 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
     request.setRetryMax(builder.getRetryMax());
     request.setRetryIntervalMax(builder.getRetryIntervalMax());
     request.setReadCostTracker(readCostTracker);
+    request.setTaskAttemptId(builder.getTaskAttemptId());
     if (builder.isExpectedTaskIdsBitmapFilterEnable()) {
       request.useExpectedTaskIdsBitmapFilter();
     }

Reply via email to