This is an automated email from the ASF dual-hosted git repository.
yl09099 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 6843c06b1 fix(spark3): Reserve previous writer initialization way for
gluten + uniffle (#2437)
6843c06b1 is described below
commit 6843c06b1b1e4fe8c8c2be84e6a15f831c4b9a4c
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Apr 11 16:18:25 2025 +0800
fix(spark3): Reserve previous writer initialization way for gluten +
uniffle (#2437)
Thanks for your review @summaryzb@jerqi
Thank you for your contribution @zuston
---
.../shuffle/manager/RssShuffleManagerBase.java | 4 ++++
.../spark/shuffle/writer/RssShuffleWriter.java | 28 ++++++++++++++++++++++
2 files changed, 32 insertions(+)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 5bf5331ef..fcfc72818 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -928,6 +928,10 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
return managerClientSupplier;
}
+ public Supplier<ShuffleManagerClient> getShuffleManagerClientSupplier() {
+ return managerClientSupplier;
+ }
+
@Override
public ShuffleHandleInfo getShuffleHandleInfoByShuffleId(int shuffleId) {
return shuffleHandleInfoManager.get(shuffleId);
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index a379e516b..14c4a68af 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -231,6 +231,34 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
this.recordReportFailedShuffleservers = Sets.newConcurrentHashSet();
}
+ // Gluten needs this method
+ public RssShuffleWriter(
+ String appId,
+ int shuffleId,
+ String taskId,
+ long taskAttemptId,
+ ShuffleWriteMetrics shuffleWriteMetrics,
+ RssShuffleManager shuffleManager,
+ SparkConf sparkConf,
+ ShuffleWriteClient shuffleWriteClient,
+ RssShuffleHandle<K, V, C> rssHandle,
+ Function<String, Boolean> taskFailureCallback,
+ TaskContext context) {
+ this(
+ appId,
+ shuffleId,
+ taskId,
+ taskAttemptId,
+ shuffleWriteMetrics,
+ shuffleManager,
+ sparkConf,
+ shuffleWriteClient,
+ shuffleManager.getShuffleManagerClientSupplier(),
+ rssHandle,
+ taskFailureCallback,
+ context);
+ }
+
public RssShuffleWriter(
String appId,
int shuffleId,