This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch branch-1.2
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/branch-1.2 by this push:
     new abf0e1dd68 [GLUTEN-7126][VL][1.2] Port #6698 #7525 #7560 for Uniffle 
bug fix (#7994)
abf0e1dd68 is described below

commit abf0e1dd688ba817ed8f745b242aabbcc05305c9
Author: Wei-Ting Chen <[email protected]>
AuthorDate: Sat Nov 23 23:02:53 2024 +0800

    [GLUTEN-7126][VL][1.2] Port #6698 #7525 #7560 for Uniffle bug fix (#7994)
---
 .../gluten/uniffle/UniffleShuffleManager.java      | 12 +++++++--
 .../writer/VeloxUniffleColumnarShuffleWriter.java  | 31 +++++++++++++++-------
 2 files changed, 32 insertions(+), 11 deletions(-)

diff --git 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
index f91141c1eb..b84f6b3b91 100644
--- 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
+++ 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
@@ -42,7 +42,8 @@ public class UniffleShuffleManager extends RssShuffleManager {
 
   public UniffleShuffleManager(SparkConf conf, boolean isDriver) {
     super(conf, isDriver);
-    conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + 
RssSparkConfig.RSS_ROW_BASED.key(), "false");
+    // FIXME: remove this after 
https://github.com/apache/incubator-uniffle/pull/2193
+    conf.set(RssSparkConfig.RSS_ENABLED.key(), "true");
   }
 
   @Override
@@ -69,6 +70,13 @@ public class UniffleShuffleManager extends RssShuffleManager 
{
       } else {
         writeMetrics = context.taskMetrics().shuffleWriteMetrics();
       }
+      // set rss.row.based to false to mark it as columnar shuffle
+      SparkConf conf =
+          sparkConf
+              .clone()
+              .set(
+                  RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + 
RssSparkConfig.RSS_ROW_BASED.key(),
+                  "false");
       return new VeloxUniffleColumnarShuffleWriter<>(
           context.partitionId(),
           rssHandle.getAppId(),
@@ -77,7 +85,7 @@ public class UniffleShuffleManager extends RssShuffleManager {
           context.taskAttemptId(),
           writeMetrics,
           this,
-          sparkConf,
+          conf,
           shuffleWriteClient,
           rssHandle,
           this::markFailedTask,
diff --git 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index ca5b3ad952..e53605f284 100644
--- 
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++ 
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -41,6 +41,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.apache.spark.util.SparkResourceUtil;
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.exception.RssException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -125,9 +126,9 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
   }
 
   @Override
-  protected void writeImpl(Iterator<Product2<K, V>> records) throws 
IOException {
-    if (!records.hasNext() && !isMemoryShuffleEnabled) {
-      super.sendCommit();
+  protected void writeImpl(Iterator<Product2<K, V>> records) {
+    if (!records.hasNext()) {
+      sendCommit();
       return;
     }
     // writer already init
@@ -185,12 +186,19 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
       }
     }
 
-    long startTime = System.nanoTime();
     LOG.info("nativeShuffleWriter value {}", nativeShuffleWriter);
+    // If all of the ColumnarBatch have empty rows, the nativeShuffleWriter 
still equals -1
     if (nativeShuffleWriter == -1L) {
-      throw new IllegalStateException("nativeShuffleWriter should not be -1L");
+      sendCommit();
+      return;
+    }
+    long startTime = System.nanoTime();
+    SplitResult splitResult;
+    try {
+      splitResult = jniWrapper.stop(nativeShuffleWriter);
+    } catch (IOException e) {
+      throw new RssException(e);
     }
-    splitResult = jniWrapper.stop(nativeShuffleWriter);
     columnarDep
         .metrics()
         .get("splitTime")
@@ -210,9 +218,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
     long pushMergedDataTime = System.nanoTime();
     // clear all
     sendRestBlockAndWait();
-    if (!isMemoryShuffleEnabled) {
-      super.sendCommit();
-    }
+    sendCommit();
     long writeDurationMs = System.nanoTime() - pushMergedDataTime;
     shuffleWriteMetrics.incWriteTime(writeDurationMs);
     LOG.info(
@@ -220,6 +226,13 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> 
extends RssShuffleWriter<K,
         TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
   }
 
+  @Override
+  protected void sendCommit() {
+    if (!isMemoryShuffleEnabled) {
+      super.sendCommit();
+    }
+  }
+
   @Override
   public Option<MapStatus> stop(boolean success) {
     if (!stopping) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to