This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 63144566d [GLUTEN-6923][CH] `total_bytes_written` is not updated in
celeborn partition writers (#6939)
63144566d is described below
commit 63144566d50a697daebdf2f3c569bddc44e398a5
Author: lgbo <[email protected]>
AuthorDate: Wed Aug 21 15:55:51 2024 +0800
[GLUTEN-6923][CH] `total_bytes_written` is not updated in celeborn
partition writers (#6939)
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
Fixes: #6923
AQE has dependency on shuffle written bytes to generate a proper plan.
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration
tests, manual tests)
manual tests
(If this patch involves UI changes, please attach a screenshot; otherwise,
remove this)
---
.../apache/gluten/backendsapi/clickhouse/CHBackend.scala | 1 -
.../backendsapi/clickhouse/CHSparkPlanExecApi.scala | 16 ++++++++++++----
cpp-ch/local-engine/Shuffle/PartitionWriter.cpp | 2 ++
cpp-ch/local-engine/local_engine_jni.cpp | 5 +++++
4 files changed, 19 insertions(+), 5 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 41ffbdb58..3c151df7e 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -407,5 +407,4 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
}
}
}
-
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 02b4777e7..676126965 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.backendsapi.clickhouse
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.{BackendsApiManager, SparkPlanExecApi}
+import org.apache.gluten.exception.GlutenException
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.expression._
@@ -29,7 +30,8 @@ import
org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode
import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
import org.apache.gluten.vectorized.CHColumnarBatchSerializer
-import org.apache.spark.{ShuffleDependency, SparkException}
+import org.apache.spark.ShuffleDependency
+import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{GenShuffleWriterParameters,
GlutenShuffleWriterWrapper, HashPartitioningWrapper}
@@ -62,7 +64,7 @@ import java.util.{ArrayList => JArrayList, List => JList, Map
=> JMap}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-class CHSparkPlanExecApi extends SparkPlanExecApi {
+class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
/** The columnar-batch type this backend is using. */
override def batchType: Convention.BatchType = CHBatch
@@ -532,10 +534,16 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
CHExecUtil.buildSideRDD(dataSize, newChild).collect
val batches = countsAndBytes.map(_._2)
+ val totalBatchesSize = batches.map(_.length).sum
val rawSize = dataSize.value
if (rawSize >= BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES) {
- throw new SparkException(
- s"Cannot broadcast the table that is larger than 8GB: ${rawSize >> 30}
GB")
+ throw new GlutenException(
+ s"Cannot broadcast the table that is larger than 8GB: $rawSize bytes")
+ }
+ if ((rawSize == 0 && totalBatchesSize != 0) || totalBatchesSize < 0) {
+ throw new GlutenException(
+ s"Invalid rawSize($rawSize) or totalBatchesSize ($totalBatchesSize).
Ensure the shuffle" +
+ s" written bytes is correct.")
}
val rowCount = countsAndBytes.map(_._1).sum
numOutputRows += rowCount
diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
index 2f22d0e24..79d640d3b 100644
--- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
+++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
@@ -469,6 +469,7 @@ size_t MemorySortCelebornPartitionWriter::evictPartitions()
celeborn_client->pushPartitionData(cur_partition_id,
data.data(), data.size());
shuffle_writer->split_result.total_io_time +=
push_time_watch.elapsedNanoseconds();
shuffle_writer->split_result.partition_lengths[cur_partition_id] += data.size();
+ shuffle_writer->split_result.total_bytes_written +=
data.size();
}
output.restart();
};
@@ -586,6 +587,7 @@ size_t CelebornPartitionWriter::evictSinglePartition(size_t
partition_id)
shuffle_writer->split_result.total_write_time +=
push_time_watch.elapsedNanoseconds();
shuffle_writer->split_result.total_io_time +=
push_time_watch.elapsedNanoseconds();
shuffle_writer->split_result.total_serialize_time +=
serialization_time_watch.elapsedNanoseconds();
+ shuffle_writer->split_result.total_bytes_written += written_bytes;
};
Stopwatch spill_time_watch;
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index f27da2f92..c80379a87 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -678,6 +678,11 @@ JNIEXPORT jobject
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_
const auto * raw_src = reinterpret_cast<const jlong
*>(raw_partition_lengths.data());
env->SetLongArrayRegion(raw_partition_length_arr, 0,
raw_partition_lengths.size(), raw_src);
+ // AQE has dependency on total_bytes_written, if the data is wrong, it
will generate inappropriate plan
+ // add a log here for remining this.
+ if (!result.total_bytes_written)
+ LOG_WARNING(getLogger("CHShuffleSplitterJniWrapper"),
"total_bytes_written is 0, something may be wrong");
+
jobject split_result = env->NewObject(
split_result_class,
split_result_constructor,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]