This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 62746c40e [GLUTEN-4903][CELEBORN] Support multiple versions of 
Celeborn (#4913)
62746c40e is described below

commit 62746c40e6e725f1c20ad06251ef976350d2bfc3
Author: Kerwin Zhang <xiyu...@alibaba-inc.com>
AuthorDate: Fri Mar 15 17:22:38 2024 +0800

    [GLUTEN-4903][CELEBORN] Support multiple versions of Celeborn (#4913)
---
 .github/workflows/velox_be.yml                     |  22 +-
 docs/get-started/ClickHouse.md                     |   1 +
 docs/get-started/Velox.md                          |   4 +-
 .../CHCelebornHashBasedColumnarShuffleWriter.scala |   4 +-
 ...bornHashBasedColumnarShuffleWriterFactory.scala |   2 +
 .../gluten/celeborn/CelebornShuffleManager.java    | 134 ++++---
 .../celeborn/CelebornShuffleWriterFactory.java     |   1 +
 .../shuffle/gluten/celeborn/CelebornUtils.java     | 384 +++++++++++++++++++++
 .../CelebornHashBasedColumnarShuffleWriter.scala   |   3 +-
 gluten-celeborn/pom.xml                            |  10 +
 ...loxCelebornHashBasedColumnarShuffleWriter.scala |   2 +
 ...bornHashBasedColumnarShuffleWriterFactory.scala |   2 +
 pom.xml                                            |   2 +-
 tools/gluten-it/package/pom.xml                    |  10 +
 tools/gluten-it/pom.xml                            |   6 +
 15 files changed, 511 insertions(+), 76 deletions(-)

diff --git a/.github/workflows/velox_be.yml b/.github/workflows/velox_be.yml
index 8aeec14db..b53c678e5 100644
--- a/.github/workflows/velox_be.yml
+++ b/.github/workflows/velox_be.yml
@@ -450,11 +450,27 @@ jobs:
             --local --preset=velox --benchmark-type=h --error-on-memleak 
--off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
           && GLUTEN_IT_JVM_ARGS=-Xmx20G sbin/gluten-it.sh queries-compare \
             --local --preset=velox --benchmark-type=ds --error-on-memleak 
--off-heap-size=40g -s=10.0 --threads=32 --iterations=1'
-      - name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.2 with Celeborn
+      - name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.2 with 
Celeborn 0.4.0
         run: |
           $PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh \
-          'wget 
https://archive.apache.org/dist/incubator/celeborn/celeborn-0.3.0-incubating/apache-celeborn-0.3.0-incubating-bin.tgz
 && \
-          tar xzf apache-celeborn-0.3.0-incubating-bin.tgz -C /opt/ && mv 
/opt/apache-celeborn-0.3.0-incubating-bin /opt/celeborn && cd /opt/celeborn && \
+          'wget 
https://archive.apache.org/dist/incubator/celeborn/celeborn-0.4.0-incubating/apache-celeborn-0.4.0-incubating-bin.tgz
 && \
+          tar xzf apache-celeborn-0.4.0-incubating-bin.tgz -C /opt/ && mv 
/opt/apache-celeborn-0.4.0-incubating-bin /opt/celeborn && cd /opt/celeborn && \
+          mv ./conf/celeborn-env.sh.template ./conf/celeborn-env.sh && \
+          echo -e 
"CELEBORN_MASTER_MEMORY=4g\nCELEBORN_WORKER_MEMORY=4g\nCELEBORN_WORKER_OFFHEAP_MEMORY=8g"
 > ./conf/celeborn-env.sh && \
+          echo -e "celeborn.worker.commitFiles.threads 
128\nceleborn.worker.sortPartition.threads 64" > ./conf/celeborn-defaults.conf \
+          && bash ./sbin/start-master.sh && bash ./sbin/start-worker.sh && \
+          cd /opt/gluten/tools/gluten-it && mvn clean install 
-Pspark-3.2,rss,celeborn-0.4 \
+          && GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
+            --local --preset=velox-with-celeborn --benchmark-type=h 
--error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
+          && GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
+            --local --preset=velox-with-celeborn --benchmark-type=ds 
--error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 && \
+          bash /opt/celeborn/sbin/stop-worker.sh \
+          && bash /opt/celeborn/sbin/stop-master.sh && rm -rf /opt/celeborn'
+      - name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.2 with 
Celeborn 0.3.2
+        run: |
+          $PATH_TO_GLUTEN_TE/$OS_IMAGE_NAME/gha/gha-checkout/exec.sh \
+          'wget 
https://archive.apache.org/dist/incubator/celeborn/celeborn-0.3.2-incubating/apache-celeborn-0.3.2-incubating-bin.tgz
 && \
+          tar xzf apache-celeborn-0.3.2-incubating-bin.tgz -C /opt/ && mv 
/opt/apache-celeborn-0.3.2-incubating-bin /opt/celeborn && cd /opt/celeborn && \
           mv ./conf/celeborn-env.sh.template ./conf/celeborn-env.sh && \
           echo -e 
"CELEBORN_MASTER_MEMORY=4g\nCELEBORN_WORKER_MEMORY=4g\nCELEBORN_WORKER_OFFHEAP_MEMORY=8g"
 > ./conf/celeborn-env.sh && \
           echo -e "celeborn.worker.commitFiles.threads 
128\nceleborn.worker.sortPartition.threads 64" > ./conf/celeborn-defaults.conf \
diff --git a/docs/get-started/ClickHouse.md b/docs/get-started/ClickHouse.md
index 3d57d1afe..ad7183b90 100644
--- a/docs/get-started/ClickHouse.md
+++ b/docs/get-started/ClickHouse.md
@@ -671,6 +671,7 @@ spark.dynamicAllocation.enabled false
 ```
 
 #### Celeborn Columnar Shuffle Support
+Currently, the supported Celeborn versions are `0.3.x` and `0.4.0`.
 The native Celeborn support can be enabled by the following configuration
 ```
 
spark.shuffle.manager=org.apache.spark.shuffle.gluten.celeborn.CelebornShuffleManager
diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md
index 79ea501da..8a9001310 100644
--- a/docs/get-started/Velox.md
+++ b/docs/get-started/Velox.md
@@ -207,7 +207,9 @@ Currently there are several ways to asscess S3 in Spark. 
Please refer [Velox S3]
 
 ## Celeborn support
 
-Gluten with velox backend supports 
[Celeborn](https://github.com/apache/incubator-celeborn) as remote shuffle 
service. Below introduction is used to enable this feature
+Gluten with velox backend supports 
[Celeborn](https://github.com/apache/incubator-celeborn) as remote shuffle 
service. Currently, the supported Celeborn versions are `0.3.x` and `0.4.0`.
+
+Below introduction is used to enable this feature
 
 First refer to this URL(https://github.com/apache/incubator-celeborn) to setup 
a celeborn cluster.
 
diff --git 
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala
 
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala
index 46c77ab64..6e794fa50 100644
--- 
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala
+++ 
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala
@@ -37,12 +37,14 @@ import java.util
 import java.util.Locale
 
 class CHCelebornHashBasedColumnarShuffleWriter[K, V](
+    shuffleId: Int,
     handle: CelebornShuffleHandle[K, V, V],
     context: TaskContext,
     celebornConf: CelebornConf,
     client: ShuffleClient,
     writeMetrics: ShuffleWriteMetricsReporter)
   extends CelebornHashBasedColumnarShuffleWriter[K, V](
+    shuffleId: Int,
     handle,
     context,
     celebornConf,
@@ -90,7 +92,7 @@ class CHCelebornHashBasedColumnarShuffleWriter[K, V](
                   "allocations from make() to split()")
             }
             logInfo(s"Gluten shuffle writer: Trying to push $size bytes of 
data")
-            val spilled = jniWrapper.evict(nativeShuffleWriter);
+            val spilled = jniWrapper.evict(nativeShuffleWriter)
             logInfo(s"Gluten shuffle writer: Spilled $spilled / $size bytes of 
data")
             spilled
           }
diff --git 
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriterFactory.scala
 
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriterFactory.scala
index 53c0994f4..240b38aa0 100644
--- 
a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriterFactory.scala
+++ 
b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriterFactory.scala
@@ -29,12 +29,14 @@ class CHCelebornHashBasedColumnarShuffleWriterFactory 
extends CelebornShuffleWri
   override def backendName(): String = CHBackend.BACKEND_NAME
 
   override def createShuffleWriterInstance[K, V](
+      shuffleId: Int,
       handle: CelebornShuffleHandle[K, V, V],
       context: TaskContext,
       celebornConf: CelebornConf,
       client: ShuffleClient,
       writeMetrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
     new CHCelebornHashBasedColumnarShuffleWriter[K, V](
+      shuffleId,
       handle,
       context,
       celebornConf,
diff --git 
a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
 
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
index c447e7ade..93681c50f 100644
--- 
a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
+++ 
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleManager.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Iterators;
 import org.apache.celeborn.client.LifecycleManager;
 import org.apache.celeborn.client.ShuffleClient;
 import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.protocol.ShuffleMode;
 import org.apache.spark.*;
 import org.apache.spark.shuffle.*;
@@ -33,6 +32,7 @@ import org.apache.spark.shuffle.sort.ColumnarShuffleManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.List;
@@ -89,6 +89,12 @@ public class CelebornShuffleManager implements 
ShuffleManager {
       ConcurrentHashMap.newKeySet();
   private final CelebornShuffleFallbackPolicyRunner fallbackPolicyRunner;
 
+  // for Celeborn 0.4.0
+  private final Object shuffleIdTracker;
+
+  // for Celeborn 0.4.0
+  private boolean throwsFetchFailure;
+
   public CelebornShuffleManager(SparkConf conf) {
     if (conf.getBoolean(LOCAL_SHUFFLE_READER_KEY, true)) {
       logger.warn(
@@ -99,6 +105,11 @@ public class CelebornShuffleManager implements 
ShuffleManager {
     this.conf = conf;
     this.celebornConf = SparkUtils.fromSparkConf(conf);
     this.fallbackPolicyRunner = new 
CelebornShuffleFallbackPolicyRunner(celebornConf);
+
+    this.shuffleIdTracker =
+        
CelebornUtils.createInstance(CelebornUtils.EXECUTOR_SHUFFLE_ID_TRACKER_NAME);
+
+    this.throwsFetchFailure = 
CelebornUtils.getThrowsFetchFailure(celebornConf);
   }
 
   private boolean isDriver() {
@@ -129,58 +140,6 @@ public class CelebornShuffleManager implements 
ShuffleManager {
     return _vanillaCelebornShuffleManager;
   }
 
-  private ShuffleClient getShuffleClient(
-      String appUniqueId,
-      String lifecycleManagerHost,
-      Integer lifecycleManagerPort,
-      CelebornConf conf,
-      UserIdentifier userIdentifier,
-      Boolean isDriver) {
-    try {
-      try {
-        Method method =
-            // for Celeborn 0.3.1 and above, see CELEBORN-804
-            ShuffleClient.class.getDeclaredMethod(
-                "get",
-                String.class,
-                String.class,
-                int.class,
-                CelebornConf.class,
-                UserIdentifier.class);
-        return (ShuffleClient)
-            method.invoke(
-                null,
-                appUniqueId,
-                lifecycleManagerHost,
-                lifecycleManagerPort,
-                conf,
-                userIdentifier);
-      } catch (NoSuchMethodException noMethod) {
-        Method method =
-            // for Celeborn 0.3.0, see CELEBORN-798
-            ShuffleClient.class.getDeclaredMethod(
-                "get",
-                String.class,
-                String.class,
-                int.class,
-                CelebornConf.class,
-                UserIdentifier.class,
-                boolean.class);
-        return (ShuffleClient)
-            method.invoke(
-                null,
-                appUniqueId,
-                lifecycleManagerHost,
-                lifecycleManagerPort,
-                conf,
-                userIdentifier,
-                isDriver);
-      }
-    } catch (ReflectiveOperationException rethrow) {
-      throw new RuntimeException(rethrow);
-    }
-  }
-
   private void initializeLifecycleManager() {
     // Only create LifecycleManager singleton in Driver. When register shuffle 
multiple times, we
     // need to ensure that LifecycleManager will only be created once. 
Parallelism needs to be
@@ -190,14 +149,19 @@ public class CelebornShuffleManager implements 
ShuffleManager {
       synchronized (this) {
         if (lifecycleManager == null) {
           lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
+
+          // for Celeborn 0.4.0
+          CelebornUtils.registerShuffleTrackerCallback(throwsFetchFailure, 
lifecycleManager);
+
           shuffleClient =
-              getShuffleClient(
+              CelebornUtils.getShuffleClient(
                   appUniqueId,
                   lifecycleManager.getHost(),
                   lifecycleManager.getPort(),
                   celebornConf,
                   lifecycleManager.getUserIdentifier(),
-                  Boolean.TRUE);
+                  Boolean.TRUE,
+                  null);
         }
       }
     }
@@ -205,12 +169,13 @@ public class CelebornShuffleManager implements 
ShuffleManager {
 
   private <K, V, C> ShuffleHandle registerCelebornShuffleHandle(
       int shuffleId, ShuffleDependency<K, V, C> dependency) {
-    return new CelebornShuffleHandle<>(
+    return CelebornUtils.getCelebornShuffleHandle(
         appUniqueId,
         lifecycleManager.getHost(),
         lifecycleManager.getPort(),
         lifecycleManager.getUserIdentifier(),
         shuffleId,
+        throwsFetchFailure,
         dependency.rdd().getNumPartitions(),
         dependency);
   }
@@ -220,6 +185,10 @@ public class CelebornShuffleManager implements 
ShuffleManager {
       int shuffleId, ShuffleDependency<K, V, C> dependency) {
     appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
     initializeLifecycleManager();
+
+    // for Celeborn 0.4.0
+    CelebornUtils.registerAppShuffleDeterminate(lifecycleManager, shuffleId, 
dependency);
+
     // Note: generate app unique id at driver side, make sure 
dependency.rdd.context
     // is the same SparkContext among different shuffleIds.
     // This method may be called many times.
@@ -248,13 +217,8 @@ public class CelebornShuffleManager implements 
ShuffleManager {
         return false;
       }
     }
-    if (appUniqueId == null) {
-      return true;
-    }
-    if (shuffleClient == null) {
-      return false;
-    }
-    return shuffleClient.unregisterShuffle(shuffleId, isDriver());
+    return CelebornUtils.unregisterShuffle(
+        lifecycleManager, shuffleClient, shuffleIdTracker, shuffleId, 
appUniqueId, isDriver());
   }
 
   @Override
@@ -284,16 +248,49 @@ public class CelebornShuffleManager implements 
ShuffleManager {
       ShuffleHandle handle, long mapId, TaskContext context, 
ShuffleWriteMetricsReporter metrics) {
     try {
       if (handle instanceof CelebornShuffleHandle) {
+        byte[] extension;
+        try {
+          Field field = 
CelebornShuffleHandle.class.getDeclaredField("extension");
+          field.setAccessible(true);
+          extension = (byte[]) field.get(handle);
+
+        } catch (NoSuchFieldException e) {
+          extension = null;
+        }
         @SuppressWarnings("unchecked")
         CelebornShuffleHandle<K, V, V> h = ((CelebornShuffleHandle<K, V, V>) 
handle);
         ShuffleClient client =
-            getShuffleClient(
+            CelebornUtils.getShuffleClient(
                 h.appUniqueId(),
                 h.lifecycleManagerHost(),
                 h.lifecycleManagerPort(),
                 celebornConf,
                 h.userIdentifier(),
-                false);
+                false,
+                extension);
+
+        int shuffleId;
+
+        // for Celeborn 0.4.0
+        try {
+          Method celebornShuffleIdMethod =
+              SparkUtils.class.getMethod(
+                  "celebornShuffleId",
+                  ShuffleClient.class,
+                  CelebornShuffleHandle.class,
+                  TaskContext.class,
+                  boolean.class);
+          shuffleId = (int) celebornShuffleIdMethod.invoke(null, 
shuffleClient, h, context, true);
+
+          Method trackMethod =
+              
CelebornUtils.getClassOrDefault(CelebornUtils.EXECUTOR_SHUFFLE_ID_TRACKER_NAME)
+                  .getMethod("track", int.class, int.class);
+          trackMethod.invoke(shuffleIdTracker, h.shuffleId(), shuffleId);
+
+        } catch (NoSuchMethodException e) {
+          shuffleId = h.dependency().shuffleId();
+        }
+
         if (!ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
           throw new UnsupportedOperationException(
               "Unrecognized shuffle write mode!" + 
celebornConf.shuffleWriterMode());
@@ -301,7 +298,7 @@ public class CelebornShuffleManager implements 
ShuffleManager {
         if (h.dependency() instanceof ColumnarShuffleDependency) {
           // columnar-based shuffle
           return writerFactory.createShuffleWriterInstance(
-              h, context, celebornConf, client, metrics);
+              shuffleId, h, context, celebornConf, client, metrics);
         } else {
           // row-based shuffle
           return vanillaCelebornShuffleManager().getWriter(handle, mapId, 
context, metrics);
@@ -327,7 +324,7 @@ public class CelebornShuffleManager implements 
ShuffleManager {
     if (handle instanceof CelebornShuffleHandle) {
       @SuppressWarnings("unchecked")
       CelebornShuffleHandle<K, ?, C> h = (CelebornShuffleHandle<K, ?, C>) 
handle;
-      return new CelebornShuffleReader<>(
+      return CelebornUtils.getCelebornShuffleReader(
           h,
           startPartition,
           endPartition,
@@ -335,7 +332,8 @@ public class CelebornShuffleManager implements 
ShuffleManager {
           endMapIndex,
           context,
           celebornConf,
-          metrics);
+          metrics,
+          shuffleIdTracker);
     }
     return columnarShuffleManager()
         .getReader(
diff --git 
a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleWriterFactory.java
 
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleWriterFactory.java
index af8935fce..9beba71d4 100644
--- 
a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleWriterFactory.java
+++ 
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornShuffleWriterFactory.java
@@ -27,6 +27,7 @@ public interface CelebornShuffleWriterFactory {
   String backendName();
 
   <K, V> ShuffleWriter<K, V> createShuffleWriterInstance(
+      int shuffleId,
       CelebornShuffleHandle<K, V, V> handle,
       TaskContext context,
       CelebornConf celebornConf,
diff --git 
a/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornUtils.java
 
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornUtils.java
new file mode 100644
index 000000000..4593d019c
--- /dev/null
+++ 
b/gluten-celeborn/common/src/main/java/org/apache/spark/shuffle/gluten/celeborn/CelebornUtils.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.shuffle.gluten.celeborn;
+
+import org.apache.celeborn.client.LifecycleManager;
+import org.apache.celeborn.client.ShuffleClient;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.spark.MapOutputTrackerMaster;
+import org.apache.spark.ShuffleDependency;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.TaskContext;
+import org.apache.spark.rdd.DeterministicLevel;
+import org.apache.spark.shuffle.ShuffleReadMetricsReporter;
+import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle;
+import org.apache.spark.shuffle.celeborn.CelebornShuffleReader;
+import org.apache.spark.shuffle.celeborn.SparkUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.function.Consumer;
+
+public class CelebornUtils {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(CelebornUtils.class);
+
+  public static final String EXECUTOR_SHUFFLE_ID_TRACKER_NAME =
+      "org.apache.spark.shuffle.celeborn.ExecutorShuffleIdTracker";
+
+  public static boolean unregisterShuffle(
+      LifecycleManager lifecycleManager,
+      ShuffleClient shuffleClient,
+      Object shuffleIdTracker,
+      int appShuffleId,
+      String appUniqueId,
+      boolean isDriver) {
+    try {
+      // for Celeborn 0.4.0
+      try {
+        if (lifecycleManager != null) {
+          Method unregisterAppShuffle =
+              lifecycleManager.getClass().getMethod("unregisterAppShuffle", 
int.class);
+          unregisterAppShuffle.invoke(lifecycleManager, appShuffleId);
+        }
+        if (shuffleClient != null) {
+          Method unregisterAppShuffleId =
+              Class.forName(EXECUTOR_SHUFFLE_ID_TRACKER_NAME)
+                  .getMethod("unregisterAppShuffleId", ShuffleClient.class, 
int.class);
+          unregisterAppShuffleId.invoke(shuffleIdTracker, shuffleClient, 
appShuffleId);
+        }
+        return true;
+      } catch (NoSuchMethodException ex) {
+        try {
+          if (lifecycleManager != null) {
+            Method unregisterShuffleMethod =
+                lifecycleManager.getClass().getMethod("unregisterShuffle", 
int.class);
+            unregisterShuffleMethod.invoke(lifecycleManager, appShuffleId);
+          }
+          if (shuffleClient != null) {
+            Method cleanupShuffleMethod =
+                shuffleClient.getClass().getMethod("cleanupShuffle", 
int.class);
+            cleanupShuffleMethod.invoke(shuffleClient, appShuffleId);
+          }
+          return true;
+        } catch (NoSuchMethodException ex1) {
+          // for Celeborn 0.3.0 and 0.3.1
+          if (appUniqueId == null) {
+            return true;
+          }
+
+          if (shuffleClient == null) {
+            return false;
+          }
+          Method unregisterShuffleMethod =
+              shuffleClient.getClass().getMethod("unregisterShuffle", 
int.class, boolean.class);
+          Object result = unregisterShuffleMethod.invoke(shuffleClient, 
appShuffleId, isDriver);
+          return (Boolean) result;
+        }
+      }
+    } catch (ReflectiveOperationException rethrow) {
+      throw new RuntimeException(rethrow);
+    }
+  }
+
+  public static ShuffleClient getShuffleClient(
+      String appUniqueId,
+      String lifecycleManagerHost,
+      Integer lifecycleManagerPort,
+      CelebornConf conf,
+      UserIdentifier userIdentifier,
+      Boolean isDriver,
+      byte[] extension) {
+    try {
+      try {
+        try {
+          Method method =
+              // for Celeborn 0.4.0
+              ShuffleClient.class.getDeclaredMethod(
+                  "get",
+                  String.class,
+                  String.class,
+                  int.class,
+                  CelebornConf.class,
+                  UserIdentifier.class,
+                  byte[].class);
+          return (ShuffleClient)
+              method.invoke(
+                  null,
+                  appUniqueId,
+                  lifecycleManagerHost,
+                  lifecycleManagerPort,
+                  conf,
+                  userIdentifier,
+                  extension);
+        } catch (NoSuchMethodException noMethod) {
+          Method method =
+              // for Celeborn 0.3.1 and above, see CELEBORN-804
+              ShuffleClient.class.getDeclaredMethod(
+                  "get",
+                  String.class,
+                  String.class,
+                  int.class,
+                  CelebornConf.class,
+                  UserIdentifier.class);
+          return (ShuffleClient)
+              method.invoke(
+                  null,
+                  appUniqueId,
+                  lifecycleManagerHost,
+                  lifecycleManagerPort,
+                  conf,
+                  userIdentifier);
+        }
+      } catch (NoSuchMethodException noMethod) {
+        Method method =
+            // for Celeborn 0.3.0, see CELEBORN-798
+            ShuffleClient.class.getDeclaredMethod(
+                "get",
+                String.class,
+                String.class,
+                int.class,
+                CelebornConf.class,
+                UserIdentifier.class,
+                boolean.class);
+        return (ShuffleClient)
+            method.invoke(
+                null,
+                appUniqueId,
+                lifecycleManagerHost,
+                lifecycleManagerPort,
+                conf,
+                userIdentifier,
+                isDriver);
+      }
+    } catch (ReflectiveOperationException rethrow) {
+      throw new RuntimeException(rethrow);
+    }
+  }
+
+  public static Object createInstance(String className) {
+    try {
+      try {
+        Class<?> clazz = Class.forName(className);
+
+        Constructor<?> constructor = clazz.getConstructor();
+
+        return constructor.newInstance();
+
+      } catch (ClassNotFoundException e) {
+        return null;
+      }
+    } catch (Exception rethrow) {
+      throw new RuntimeException(rethrow);
+    }
+  }
+
+  public static CelebornShuffleHandle getCelebornShuffleHandle(
+      String appUniqueId,
+      String lifecycleManagerHost,
+      int lifecycleManagerPort,
+      UserIdentifier userIdentifier,
+      int shuffleId,
+      boolean throwsFetchFailure,
+      int numMappers,
+      ShuffleDependency dependency) {
+    try {
+      try {
+        Constructor<CelebornShuffleHandle> constructor =
+            CelebornShuffleHandle.class.getConstructor(
+                String.class,
+                String.class,
+                int.class,
+                UserIdentifier.class,
+                int.class,
+                boolean.class,
+                int.class,
+                ShuffleDependency.class);
+
+        return constructor.newInstance(
+            appUniqueId,
+            lifecycleManagerHost,
+            lifecycleManagerPort,
+            userIdentifier,
+            shuffleId,
+            throwsFetchFailure,
+            numMappers,
+            dependency);
+      } catch (NoSuchMethodException noMethod) {
+        Constructor<CelebornShuffleHandle> constructor =
+            CelebornShuffleHandle.class.getConstructor(
+                String.class,
+                String.class,
+                int.class,
+                UserIdentifier.class,
+                int.class,
+                int.class,
+                ShuffleDependency.class);
+
+        return constructor.newInstance(
+            appUniqueId,
+            lifecycleManagerHost,
+            lifecycleManagerPort,
+            userIdentifier,
+            shuffleId,
+            numMappers,
+            dependency);
+      }
+    } catch (ReflectiveOperationException rethrow) {
+      throw new RuntimeException(rethrow);
+    }
+  }
+
+  public static CelebornShuffleReader getCelebornShuffleReader(
+      CelebornShuffleHandle handle,
+      int startPartition,
+      int endPartition,
+      int startMapIndex,
+      int endMapIndex,
+      TaskContext context,
+      CelebornConf conf,
+      ShuffleReadMetricsReporter metrics,
+      Object shuffleIdTracker) {
+    try {
+      try {
+        // for Celeborn 0.4.0
+        Constructor<CelebornShuffleReader> constructor =
+            CelebornShuffleReader.class.getConstructor(
+                CelebornShuffleHandle.class,
+                int.class,
+                int.class,
+                int.class,
+                int.class,
+                TaskContext.class,
+                CelebornConf.class,
+                ShuffleReadMetricsReporter.class,
+                getClassOrDefault(EXECUTOR_SHUFFLE_ID_TRACKER_NAME));
+
+        return constructor.newInstance(
+            handle,
+            startPartition,
+            endPartition,
+            startMapIndex,
+            endMapIndex,
+            context,
+            conf,
+            metrics,
+            shuffleIdTracker);
+      } catch (NoSuchMethodException noMethod) {
+        // for celeborn 0.3.x
+        Constructor<CelebornShuffleReader> constructor =
+            CelebornShuffleReader.class.getConstructor(
+                CelebornShuffleHandle.class,
+                int.class,
+                int.class,
+                int.class,
+                int.class,
+                TaskContext.class,
+                CelebornConf.class,
+                ShuffleReadMetricsReporter.class);
+
+        return constructor.newInstance(
+            handle,
+            startPartition,
+            endPartition,
+            startMapIndex,
+            endMapIndex,
+            context,
+            conf,
+            metrics);
+      }
+    } catch (ReflectiveOperationException rethrow) {
+      throw new RuntimeException(rethrow);
+    }
+  }
+
+  public static Class<?> getClassOrDefault(String className) {
+    try {
+      return Class.forName(className);
+    } catch (ClassNotFoundException e) {
+      return Object.class;
+    }
+  }
+
+  public static boolean getThrowsFetchFailure(CelebornConf celebornConf) {
+    try {
+      Method clientFetchThrowsFetchFailureMethod =
+          
celebornConf.getClass().getDeclaredMethod("clientFetchThrowsFetchFailure");
+      return (Boolean) 
clientFetchThrowsFetchFailureMethod.invoke(celebornConf);
+    } catch (NoSuchMethodException e) {
+      return false;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void registerShuffleTrackerCallback(
+      boolean throwsFetchFailure, LifecycleManager lifecycleManager) {
+    try {
+      if (throwsFetchFailure) {
+        MapOutputTrackerMaster mapOutputTracker =
+            (MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker();
+
+        Method registerShuffleTrackerCallbackMethod =
+            lifecycleManager
+                .getClass()
+                .getDeclaredMethod("registerShuffleTrackerCallback", 
Consumer.class);
+
+        Consumer<Integer> consumer =
+            shuffleId -> {
+              try {
+                Method unregisterAllMapOutputMethod =
+                    SparkUtils.class.getMethod(
+                        "unregisterAllMapOutput", 
MapOutputTrackerMaster.class, int.class);
+                unregisterAllMapOutputMethod.invoke(null, mapOutputTracker, 
shuffleId);
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            };
+
+        registerShuffleTrackerCallbackMethod.invoke(lifecycleManager, 
consumer);
+      }
+    } catch (NoSuchMethodException e) {
+      logger.debug("Executing the initializeLifecycleManager of 
celeborn-0.3.x");
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void registerAppShuffleDeterminate(
+      LifecycleManager lifecycleManager, int shuffleId, ShuffleDependency 
dependency) {
+    try {
+      Method registerAppShuffleDeterminateMethod =
+          lifecycleManager
+              .getClass()
+              .getDeclaredMethod("registerAppShuffleDeterminate", 
Integer.TYPE, Boolean.TYPE);
+
+      registerAppShuffleDeterminateMethod.invoke(
+          lifecycleManager,
+          shuffleId,
+          dependency.rdd().getOutputDeterministicLevel() != 
DeterministicLevel.INDETERMINATE());
+
+    } catch (NoSuchMethodException e) {
+      logger.debug("Executing the registerShuffle of celeborn-0.3.x");
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala
 
b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala
index 1310a60ca..205652875 100644
--- 
a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala
+++ 
b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornHashBasedColumnarShuffleWriter.scala
@@ -30,6 +30,7 @@ import org.apache.celeborn.common.CelebornConf
 import java.io.IOException
 
 abstract class CelebornHashBasedColumnarShuffleWriter[K, V](
+    shuffleId: Int,
     handle: CelebornShuffleHandle[K, V, V],
     context: TaskContext,
     celebornConf: CelebornConf,
@@ -38,8 +39,6 @@ abstract class CelebornHashBasedColumnarShuffleWriter[K, V](
   extends ShuffleWriter[K, V]
   with Logging {
 
-  protected val shuffleId: Int = handle.dependency.shuffleId
-
   protected val numMappers: Int = handle.numMappers
 
   protected val numPartitions: Int = 
handle.dependency.partitioner.numPartitions
diff --git a/gluten-celeborn/pom.xml b/gluten-celeborn/pom.xml
index d6caedd74..3a1f86c8e 100755
--- a/gluten-celeborn/pom.xml
+++ b/gluten-celeborn/pom.xml
@@ -26,6 +26,16 @@
       
<artifactId>celeborn-client-spark-${spark.major.version}-shaded_${scala.binary.version}</artifactId>
       <version>${celeborn.version}</version>
       <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.celeborn</groupId>
+          
<artifactId>celeborn-client-spark-${spark.major.version}_${scala.binary.version}</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.celeborn</groupId>
+          
<artifactId>celeborn-spark-${spark.major.version}-columnar-shuffle_${scala.binary.version}</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
diff --git 
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala
 
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala
index 172328d81..2662c9acd 100644
--- 
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala
+++ 
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriter.scala
@@ -38,12 +38,14 @@ import java.io.IOException
 import java.util
 
 class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](
+    shuffleId: Int,
     handle: CelebornShuffleHandle[K, V, V],
     context: TaskContext,
     celebornConf: CelebornConf,
     client: ShuffleClient,
     writeMetrics: ShuffleWriteMetricsReporter)
   extends CelebornHashBasedColumnarShuffleWriter[K, V](
+    shuffleId,
     handle,
     context,
     celebornConf,
diff --git 
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriterFactory.scala
 
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriterFactory.scala
index ceaa950c3..fe4a9b938 100644
--- 
a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriterFactory.scala
+++ 
b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornHashBasedColumnarShuffleWriterFactory.scala
@@ -29,12 +29,14 @@ class VeloxCelebornHashBasedColumnarShuffleWriterFactory 
extends CelebornShuffle
   override def backendName(): String = VeloxBackend.BACKEND_NAME
 
   override def createShuffleWriterInstance[K, V](
+      shuffleId: Int,
       handle: CelebornShuffleHandle[K, V, V],
       context: TaskContext,
       celebornConf: CelebornConf,
       client: ShuffleClient,
       writeMetrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
     new VeloxCelebornHashBasedColumnarShuffleWriter[K, V](
+      shuffleId,
       handle,
       context,
       celebornConf,
diff --git a/pom.xml b/pom.xml
index be1a3544d..54c9fd093 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,7 +47,7 @@
     <sparkbundle.version>3.2</sparkbundle.version>
     <spark.version>3.4.2</spark.version>
     
<sparkshim.artifactId>spark-sql-columnar-shims-spark32</sparkshim.artifactId>
-    <celeborn.version>0.3.0-incubating</celeborn.version>
+    <celeborn.version>0.3.2-incubating</celeborn.version>
     <arrow.version>14.0.1</arrow.version>
     <arrow-memory.artifact>arrow-memory-unsafe</arrow-memory.artifact>
     <hadoop.version>2.7.4</hadoop.version>
diff --git a/tools/gluten-it/package/pom.xml b/tools/gluten-it/package/pom.xml
index 404366c27..29c38c281 100644
--- a/tools/gluten-it/package/pom.xml
+++ b/tools/gluten-it/package/pom.xml
@@ -105,6 +105,16 @@
           
<artifactId>celeborn-client-spark-${spark.major.version}-shaded_${scala.binary.version}</artifactId>
           <version>${celeborn.version}</version>
           <scope>runtime</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>org.apache.celeborn</groupId>
+              
<artifactId>celeborn-client-spark-${spark.major.version}_${scala.binary.version}</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>org.apache.celeborn</groupId>
+              
<artifactId>celeborn-spark-${spark.major.version}-columnar-shuffle_${scala.binary.version}</artifactId>
+            </exclusion>
+          </exclusions>
         </dependency>
       </dependencies>
     </profile>
diff --git a/tools/gluten-it/pom.xml b/tools/gluten-it/pom.xml
index 3060c2f3a..7823cd32f 100644
--- a/tools/gluten-it/pom.xml
+++ b/tools/gluten-it/pom.xml
@@ -192,5 +192,11 @@
         </dependency>
       </dependencies>
     </profile>
+    <profile>
+      <id>celeborn-0.4</id>
+      <properties>
+        <celeborn.version>0.4.0-incubating</celeborn.version>
+      </properties>
+    </profile>
   </profiles>
 </project>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org


Reply via email to