This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch branch-1.2
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/branch-1.2 by this push:
new abf0e1dd68 [GLUTEN-7126][VL][1.2] Port #6698 #7525 #7560 for Uniffle
bug fix (#7994)
abf0e1dd68 is described below
commit abf0e1dd688ba817ed8f745b242aabbcc05305c9
Author: Wei-Ting Chen <[email protected]>
AuthorDate: Sat Nov 23 23:02:53 2024 +0800
[GLUTEN-7126][VL][1.2] Port #6698 #7525 #7560 for Uniffle bug fix (#7994)
---
.../gluten/uniffle/UniffleShuffleManager.java | 12 +++++++--
.../writer/VeloxUniffleColumnarShuffleWriter.java | 31 +++++++++++++++-------
2 files changed, 32 insertions(+), 11 deletions(-)
diff --git
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
index f91141c1eb..b84f6b3b91 100644
---
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
+++
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java
@@ -42,7 +42,8 @@ public class UniffleShuffleManager extends RssShuffleManager {
public UniffleShuffleManager(SparkConf conf, boolean isDriver) {
super(conf, isDriver);
- conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX +
RssSparkConfig.RSS_ROW_BASED.key(), "false");
+ // FIXME: remove this after
https://github.com/apache/incubator-uniffle/pull/2193
+ conf.set(RssSparkConfig.RSS_ENABLED.key(), "true");
}
@Override
@@ -69,6 +70,13 @@ public class UniffleShuffleManager extends RssShuffleManager
{
} else {
writeMetrics = context.taskMetrics().shuffleWriteMetrics();
}
+ // set rss.row.based to false to mark it as columnar shuffle
+ SparkConf conf =
+ sparkConf
+ .clone()
+ .set(
+ RssSparkConfig.SPARK_RSS_CONFIG_PREFIX +
RssSparkConfig.RSS_ROW_BASED.key(),
+ "false");
return new VeloxUniffleColumnarShuffleWriter<>(
context.partitionId(),
rssHandle.getAppId(),
@@ -77,7 +85,7 @@ public class UniffleShuffleManager extends RssShuffleManager {
context.taskAttemptId(),
writeMetrics,
this,
- sparkConf,
+ conf,
shuffleWriteClient,
rssHandle,
this::markFailedTask,
diff --git
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index ca5b3ad952..e53605f284 100644
---
a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++
b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -41,6 +41,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.util.SparkResourceUtil;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.exception.RssException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -125,9 +126,9 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
}
@Override
- protected void writeImpl(Iterator<Product2<K, V>> records) throws
IOException {
- if (!records.hasNext() && !isMemoryShuffleEnabled) {
- super.sendCommit();
+ protected void writeImpl(Iterator<Product2<K, V>> records) {
+ if (!records.hasNext()) {
+ sendCommit();
return;
}
// writer already init
@@ -185,12 +186,19 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
}
}
- long startTime = System.nanoTime();
LOG.info("nativeShuffleWriter value {}", nativeShuffleWriter);
+ // If all of the ColumnarBatch have empty rows, the nativeShuffleWriter
still equals -1
if (nativeShuffleWriter == -1L) {
- throw new IllegalStateException("nativeShuffleWriter should not be -1L");
+ sendCommit();
+ return;
+ }
+ long startTime = System.nanoTime();
+ SplitResult splitResult;
+ try {
+ splitResult = jniWrapper.stop(nativeShuffleWriter);
+ } catch (IOException e) {
+ throw new RssException(e);
}
- splitResult = jniWrapper.stop(nativeShuffleWriter);
columnarDep
.metrics()
.get("splitTime")
@@ -210,9 +218,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
long pushMergedDataTime = System.nanoTime();
// clear all
sendRestBlockAndWait();
- if (!isMemoryShuffleEnabled) {
- super.sendCommit();
- }
+ sendCommit();
long writeDurationMs = System.nanoTime() - pushMergedDataTime;
shuffleWriteMetrics.incWriteTime(writeDurationMs);
LOG.info(
@@ -220,6 +226,13 @@ public class VeloxUniffleColumnarShuffleWriter<K, V>
extends RssShuffleWriter<K,
TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
}
+ @Override
+ protected void sendCommit() {
+ if (!isMemoryShuffleEnabled) {
+ super.sendCommit();
+ }
+ }
+
@Override
public Option<MapStatus> stop(boolean success) {
if (!stopping) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]