This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 9fdde02b5 [#2591] fix(client): Missing task_id propagation in
getLocalShuffleDataV3 (#2605)
9fdde02b5 is described below
commit 9fdde02b51a04176e29508a39583e0021547f3a0
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Sep 11 17:43:37 2025 +0800
[#2591] fix(client): Missing task_id propagation in getLocalShuffleDataV3
(#2605)
### What changes were proposed in this pull request?
Fix missing task_id propagation in getLocalShuffleDataV3
### Why are the changes needed?
Task_id is invalid
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal tests.
---
.../java/org/apache/spark/shuffle/reader/RssShuffleReader.java | 7 +++----
.../java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java | 1 +
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index f03aa9955..93584e124 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -86,6 +86,7 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
private ShuffleDependency<K, ?, C> shuffleDependency;
private int numMaps;
private Serializer serializer;
+ private long taskAttemptId;
private String taskId;
private String basePath;
private int partitionNum;
@@ -134,6 +135,7 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
this.shuffleId = shuffleDependency.shuffleId();
this.serializer = rssShuffleHandle.getDependency().serializer();
this.taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber();
+ this.taskAttemptId = context.taskAttemptId();
this.basePath = basePath;
this.partitionNum = partitionNum;
this.partitionToExpectBlocks = partitionToExpectBlocks;
@@ -317,10 +319,7 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
.retryMax(retryMax)
.retryIntervalMax(retryIntervalMax)
.rssConf(rssConf)
- .taskAttemptId(
- Optional.ofNullable(TaskContext.get())
- .map(taskContext -> taskContext.taskAttemptId())
- .orElse(0L));
+ .taskAttemptId(taskAttemptId);
if (codec.isPresent() &&
rssConf.get(RSS_READ_OVERLAPPING_DECOMPRESSION_ENABLED)) {
builder
.overlappingDecompressionEnabled(true)
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 8586d3493..dcde0ca3e 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -199,6 +199,7 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
request.setRetryMax(builder.getRetryMax());
request.setRetryIntervalMax(builder.getRetryIntervalMax());
request.setReadCostTracker(readCostTracker);
+ request.setTaskAttemptId(builder.getTaskAttemptId());
if (builder.isExpectedTaskIdsBitmapFilterEnable()) {
request.useExpectedTaskIdsBitmapFilter();
}