This is an automated email from the ASF dual-hosted git repository.

liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 63144566d [GLUTEN-6923][CH] `total_bytes_written` is not updated in 
celeborn partition writers (#6939)
63144566d is described below

commit 63144566d50a697daebdf2f3c569bddc44e398a5
Author: lgbo <[email protected]>
AuthorDate: Wed Aug 21 15:55:51 2024 +0800

    [GLUTEN-6923][CH] `total_bytes_written` is not updated in celeborn 
partition writers (#6939)
    
    What changes were proposed in this pull request?
    (Please fill in changes proposed in this fix)
    
    Fixes: #6923
    
    AQE has dependency on shuffle written bytes to generate a proper plan.
    
    How was this patch tested?
    (Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
    
    manual tests
    
    (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
---
 .../apache/gluten/backendsapi/clickhouse/CHBackend.scala |  1 -
 .../backendsapi/clickhouse/CHSparkPlanExecApi.scala      | 16 ++++++++++++----
 cpp-ch/local-engine/Shuffle/PartitionWriter.cpp          |  2 ++
 cpp-ch/local-engine/local_engine_jni.cpp                 |  5 +++++
 4 files changed, 19 insertions(+), 5 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 41ffbdb58..3c151df7e 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -407,5 +407,4 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
         }
       }
   }
-
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 02b4777e7..676126965 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.backendsapi.clickhouse
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.{BackendsApiManager, SparkPlanExecApi}
+import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.execution._
 import org.apache.gluten.expression._
@@ -29,7 +30,8 @@ import 
org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode
 import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
 import org.apache.gluten.vectorized.CHColumnarBatchSerializer
 
-import org.apache.spark.{ShuffleDependency, SparkException}
+import org.apache.spark.ShuffleDependency
+import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.{GenShuffleWriterParameters, 
GlutenShuffleWriterWrapper, HashPartitioningWrapper}
@@ -62,7 +64,7 @@ import java.util.{ArrayList => JArrayList, List => JList, Map 
=> JMap}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-class CHSparkPlanExecApi extends SparkPlanExecApi {
+class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
 
   /** The columnar-batch type this backend is using. */
   override def batchType: Convention.BatchType = CHBatch
@@ -532,10 +534,16 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
       CHExecUtil.buildSideRDD(dataSize, newChild).collect
 
     val batches = countsAndBytes.map(_._2)
+    val totalBatchesSize = batches.map(_.length).sum
     val rawSize = dataSize.value
     if (rawSize >= BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES) {
-      throw new SparkException(
-        s"Cannot broadcast the table that is larger than 8GB: ${rawSize >> 30} 
GB")
+      throw new GlutenException(
+        s"Cannot broadcast the table that is larger than 8GB: $rawSize bytes")
+    }
+    if ((rawSize == 0 && totalBatchesSize != 0) || totalBatchesSize < 0) {
+      throw new GlutenException(
+        s"Invalid rawSize($rawSize) or totalBatchesSize ($totalBatchesSize). 
Ensure the shuffle" +
+          s" written bytes is correct.")
     }
     val rowCount = countsAndBytes.map(_._1).sum
     numOutputRows += rowCount
diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp 
b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
index 2f22d0e24..79d640d3b 100644
--- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
+++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
@@ -469,6 +469,7 @@ size_t MemorySortCelebornPartitionWriter::evictPartitions()
                 celeborn_client->pushPartitionData(cur_partition_id, 
data.data(), data.size());
                 shuffle_writer->split_result.total_io_time += 
push_time_watch.elapsedNanoseconds();
                 
shuffle_writer->split_result.partition_lengths[cur_partition_id] += data.size();
+                shuffle_writer->split_result.total_bytes_written += 
data.size();
             }
             output.restart();
         };
@@ -586,6 +587,7 @@ size_t CelebornPartitionWriter::evictSinglePartition(size_t 
partition_id)
         shuffle_writer->split_result.total_write_time += 
push_time_watch.elapsedNanoseconds();
         shuffle_writer->split_result.total_io_time += 
push_time_watch.elapsedNanoseconds();
         shuffle_writer->split_result.total_serialize_time += 
serialization_time_watch.elapsedNanoseconds();
+        shuffle_writer->split_result.total_bytes_written += written_bytes;
     };
 
     Stopwatch spill_time_watch;
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index f27da2f92..c80379a87 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -678,6 +678,11 @@ JNIEXPORT jobject 
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_
     const auto * raw_src = reinterpret_cast<const jlong 
*>(raw_partition_lengths.data());
     env->SetLongArrayRegion(raw_partition_length_arr, 0, 
raw_partition_lengths.size(), raw_src);
 
+    // AQE has dependency on total_bytes_written, if the data is wrong, it 
will generate inappropriate plan
+    // add a log here for remining this.
+    if (!result.total_bytes_written)
+        LOG_WARNING(getLogger("CHShuffleSplitterJniWrapper"), 
"total_bytes_written is 0, something may be wrong");
+
     jobject split_result = env->NewObject(
         split_result_class,
         split_result_constructor,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to