This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f52885898 [CORE] Port "SPARK-39983 Should not cache unserialized 
broadcast relations on the driver" (#5149)
f52885898 is described below

commit f52885898c20e99684503e37fe1db97a13244e11
Author: Xiduo You <ulyssesyo...@gmail.com>
AuthorDate: Thu Mar 28 08:55:10 2024 +0800

    [CORE] Port "SPARK-39983 Should not cache unserialized broadcast relations 
on the driver" (#5149)
---
 .../execution/ColumnarBroadcastExchangeExec.scala  |  4 +-
 .../apache/spark/sql/GlutenSQLTestsBaseTrait.scala | 70 ++++++++++++----------
 .../utils/velox/VeloxTestSettings.scala            |  3 +-
 .../execution/GlutenBroadcastExchangeSuite.scala   | 39 +++++++++++-
 .../io/glutenproject/sql/shims/SparkShims.scala    | 10 ++++
 .../sql/shims/spark34/Spark34Shims.scala           |  9 ++-
 .../scala/org/apache/spark/SparkContextUtils.scala | 12 +++-
 .../sql/shims/spark35/Spark35Shims.scala           |  9 ++-
 .../scala/org/apache/spark/SparkContextUtils.scala | 12 +++-
 9 files changed, 124 insertions(+), 44 deletions(-)

diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index 645bce76d..b90ff4967 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -80,7 +80,9 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, 
child: SparkPlan)
         val broadcasted = GlutenTimeMetric.millis(longMetric("broadcastTime")) 
{
           _ =>
             // Broadcast the relation
-            sparkContext.broadcast(relation.asInstanceOf[Any])
+            SparkShimLoader.getSparkShims.broadcastInternal(
+              sparkContext,
+              relation.asInstanceOf[Any])
         }
 
         // Update driver metrics
diff --git 
a/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala
 
b/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala
index 68fa08879..5cd9f3e9c 100644
--- 
a/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala
+++ 
b/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala
@@ -45,39 +45,7 @@ trait GlutenSQLTestsBaseTrait extends SharedSparkSession 
with GlutenTestsBaseTra
   }
 
   override def sparkConf: SparkConf = {
-    // Native SQL configs
-    val conf = super.sparkConf
-      .setAppName("Gluten-UT")
-      .set("spark.driver.memory", "1G")
-      .set("spark.sql.adaptive.enabled", "true")
-      .set("spark.sql.shuffle.partitions", "1")
-      .set("spark.sql.files.maxPartitionBytes", "134217728")
-      .set("spark.memory.offHeap.enabled", "true")
-      .set("spark.memory.offHeap.size", "1024MB")
-      .set("spark.plugins", "io.glutenproject.GlutenPlugin")
-      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
-      .set("spark.sql.warehouse.dir", warehouse)
-      .set("spark.ui.enabled", "false")
-      .set("spark.gluten.ui.enabled", "false")
-    // Avoid static evaluation by spark catalyst. But there are some UT issues
-    // coming from spark, e.g., expecting SparkException is thrown, but the 
wrapped
-    // exception is thrown.
-    // .set("spark.sql.optimizer.excludedRules", ConstantFolding.ruleName + 
"," +
-    //     NullPropagation.ruleName)
-
-    if (BackendTestUtils.isCHBackendLoaded()) {
-      conf
-        .set("spark.io.compression.codec", "LZ4")
-        .set("spark.gluten.sql.columnar.backend.ch.worker.id", "1")
-        .set("spark.gluten.sql.enable.native.validation", "false")
-        .set(GlutenConfig.GLUTEN_LIB_PATH, 
SystemParameters.getClickHouseLibPath)
-        .set("spark.sql.files.openCostInBytes", "134217728")
-        .set("spark.unsafe.exceptionOnMemoryLeak", "true")
-    } else {
-      conf.set("spark.unsafe.exceptionOnMemoryLeak", "true")
-    }
-
-    conf
+    GlutenSQLTestsBaseTrait.nativeSparkConf(super.sparkConf, warehouse)
   }
 
   /**
@@ -126,3 +94,39 @@ trait GlutenSQLTestsBaseTrait extends SharedSparkSession 
with GlutenTestsBaseTra
     }
   }
 }
+
+object GlutenSQLTestsBaseTrait {
+  def nativeSparkConf(origin: SparkConf, warehouse: String): SparkConf = {
+    // Native SQL configs
+    val conf = origin
+      .setAppName("Gluten-UT")
+      .set("spark.driver.memory", "1G")
+      .set("spark.sql.adaptive.enabled", "true")
+      .set("spark.sql.shuffle.partitions", "1")
+      .set("spark.sql.files.maxPartitionBytes", "134217728")
+      .set("spark.memory.offHeap.enabled", "true")
+      .set("spark.memory.offHeap.size", "1024MB")
+      .set("spark.plugins", "io.glutenproject.GlutenPlugin")
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+      .set("spark.sql.warehouse.dir", warehouse)
+      .set("spark.ui.enabled", "false")
+      .set("spark.gluten.ui.enabled", "false")
+    // Avoid static evaluation by spark catalyst. But there are some UT issues
+    // coming from spark, e.g., expecting SparkException is thrown, but the 
wrapped
+    // exception is thrown.
+    // .set("spark.sql.optimizer.excludedRules", ConstantFolding.ruleName + 
"," +
+    //     NullPropagation.ruleName)
+
+    if (BackendTestUtils.isCHBackendLoaded()) {
+      conf
+        .set("spark.io.compression.codec", "LZ4")
+        .set("spark.gluten.sql.columnar.backend.ch.worker.id", "1")
+        .set("spark.gluten.sql.enable.native.validation", "false")
+        .set(GlutenConfig.GLUTEN_LIB_PATH, 
SystemParameters.getClickHouseLibPath)
+        .set("spark.sql.files.openCostInBytes", "134217728")
+        .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+    } else {
+      conf.set("spark.unsafe.exceptionOnMemoryLeak", "true")
+    }
+  }
+}
diff --git 
a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala
index 852ba27f2..84fc83083 100644
--- 
a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql._
 import 
org.apache.spark.sql.catalyst.expressions.{GlutenArithmeticExpressionSuite, 
GlutenBitwiseExpressionsSuite, GlutenCastSuite, 
GlutenCollectionExpressionsSuite, GlutenComplexTypeSuite, 
GlutenConditionalExpressionSuite, GlutenDateExpressionsSuite, 
GlutenDecimalExpressionSuite, GlutenHashExpressionsSuite, 
GlutenIntervalExpressionsSuite, GlutenLiteralExpressionSuite, 
GlutenMathExpressionsSuite, GlutenMiscExpressionsSuite, 
GlutenNondeterministicSuite, GlutenNullExpressionsSuite, GlutenPr [...]
 import 
org.apache.spark.sql.connector.{GlutenDataSourceV2DataFrameSessionCatalogSuite, 
GlutenDataSourceV2DataFrameSuite, GlutenDataSourceV2FunctionSuite, 
GlutenDataSourceV2SQLSessionCatalogSuite, GlutenDataSourceV2SQLSuiteV1Filter, 
GlutenDataSourceV2SQLSuiteV2Filter, GlutenDataSourceV2Suite, 
GlutenDeleteFromTableSuite, GlutenFileDataSourceV2FallBackSuite, 
GlutenKeyGroupedPartitioningSuite, GlutenLocalScanSuite, 
GlutenMetadataColumnSuite, GlutenSupportsCatalogOptionsSuite, GlutenTableCapa 
[...]
 import org.apache.spark.sql.errors.{GlutenQueryCompilationErrorsDSv2Suite, 
GlutenQueryCompilationErrorsSuite, GlutenQueryExecutionErrorsSuite, 
GlutenQueryParsingErrorsSuite}
-import org.apache.spark.sql.execution.{FallbackStrategiesSuite, 
GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, 
GlutenExchangeSuite, GlutenReplaceHashWithSortAggSuite, 
GlutenReuseExchangeAndSubquerySuite, GlutenSameResultSuite, GlutenSortSuite, 
GlutenSQLAggregateFunctionSuite, GlutenSQLWindowFunctionSuite, 
GlutenTakeOrderedAndProjectSuite}
+import org.apache.spark.sql.execution.{FallbackStrategiesSuite, 
GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, 
GlutenExchangeSuite, GlutenLocalBroadcastExchangeSuite, 
GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, 
GlutenSameResultSuite, GlutenSortSuite, GlutenSQLAggregateFunctionSuite, 
GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite}
 import 
org.apache.spark.sql.execution.adaptive.velox.VeloxAdaptiveQueryExecSuite
 import org.apache.spark.sql.execution.datasources.{GlutenBucketingUtilsSuite, 
GlutenCSVReadSchemaSuite, GlutenDataSourceStrategySuite, GlutenDataSourceSuite, 
GlutenFileFormatWriterSuite, GlutenFileIndexSuite, 
GlutenFileMetadataStructSuite, GlutenFileSourceStrategySuite, 
GlutenHadoopFileLinesReaderSuite, GlutenHeaderCSVReadSchemaSuite, 
GlutenJsonReadSchemaSuite, GlutenMergedOrcReadSchemaSuite, 
GlutenMergedParquetReadSchemaSuite, GlutenOrcCodecSuite, 
GlutenOrcReadSchemaSuite, GlutenOrcV1Ag [...]
 import 
org.apache.spark.sql.execution.datasources.binaryfile.GlutenBinaryFileFormatSuite
@@ -827,6 +827,7 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenOuterJoinSuite]
   enableSuite[FallbackStrategiesSuite]
   enableSuite[GlutenBroadcastExchangeSuite]
+  enableSuite[GlutenLocalBroadcastExchangeSuite]
   enableSuite[GlutenCoalesceShufflePartitionsSuite]
     .excludeByPrefix("determining the number of reducers")
   enableSuite[GlutenExchangeSuite]
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
index 481863354..0f953b6d6 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
@@ -16,6 +16,43 @@
  */
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
+import org.apache.spark.broadcast.TorrentBroadcast
+import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, GlutenTestsBaseTrait, 
SparkSession}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.functions.broadcast
 
 class GlutenBroadcastExchangeSuite extends BroadcastExchangeSuite with 
GlutenSQLTestsBaseTrait {}
+
+// Additional tests run in 'local-cluster' mode.
+class GlutenLocalBroadcastExchangeSuite
+  extends SparkFunSuite
+  with LocalSparkContext
+  with GlutenTestsBaseTrait
+  with AdaptiveSparkPlanHelper {
+
+  def newSparkConf(): SparkConf = {
+    val conf = new SparkConf().setMaster("local-cluster[2,1,1024]")
+    GlutenSQLTestsBaseTrait.nativeSparkConf(conf, warehouse)
+  }
+
+  test("SPARK-39983 - Broadcasted relation is not cached on the driver") {
+    // Use distributed cluster as in local mode the broabcast value is 
actually cached.
+    val conf = newSparkConf()
+    sc = new SparkContext(conf)
+    val spark = new SparkSession(sc)
+
+    val df = spark.range(1).toDF()
+    val joinDF = df.join(broadcast(df), "id")
+    joinDF.collect()
+    val broadcastExchangeExec = collect(joinDF.queryExecution.executedPlan) {
+      case p: ColumnarBroadcastExchangeExec => p
+    }
+    assert(broadcastExchangeExec.size == 1, "one and only 
ColumnarBroadcastExchangeExec")
+
+    // The broadcasted relation should not be cached on the driver.
+    val broadcasted =
+      
broadcastExchangeExec(0).relationFuture.get().asInstanceOf[TorrentBroadcast[Any]]
+    assert(!broadcasted.hasCachedValue)
+  }
+}
diff --git 
a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
index df1f3e451..7d71346a7 100644
--- a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala
@@ -19,6 +19,7 @@ package io.glutenproject.sql.shims
 import io.glutenproject.expression.Sig
 
 import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.scheduler.TaskInfo
 import org.apache.spark.shuffle.ShuffleHandle
@@ -46,6 +47,8 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 
 import java.util.{ArrayList => JArrayList, Map => JMap}
 
+import scala.reflect.ClassTag
+
 sealed abstract class ShimDescriptor
 
 case class SparkShimDescriptor(major: Int, minor: Int, patch: Int) extends 
ShimDescriptor {
@@ -123,6 +126,13 @@ trait SparkShims {
 
   def createTestTaskContext(): TaskContext
 
+  def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T] 
= {
+    // Since Spark 3.4, the `sc.broadcast` has been optimized to use 
`sc.broadcastInternal`.
+    // More details see SPARK-39983.
+    // TODO, remove this shim once we drop Spark3.3 and previous
+    sc.broadcast(value)
+  }
+
   // To be compatible with Spark-3.5 and later
   // See https://github.com/apache/spark/pull/41440
   def setJobDescriptionOrTagForBroadcastExchange(
diff --git 
a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
 
b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
index 027e88cbc..2732c190f 100644
--- 
a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
+++ 
b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala
@@ -20,7 +20,8 @@ import io.glutenproject.GlutenConfig
 import io.glutenproject.expression.{ExpressionNames, Sig}
 import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims}
 
-import org.apache.spark.{ShuffleUtils, SparkContext, SparkException, 
TaskContext, TaskContextUtils}
+import org.apache.spark.{ShuffleUtils, SparkContext, SparkContextUtils, 
SparkException, TaskContext, TaskContextUtils}
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.paths.SparkPath
 import org.apache.spark.scheduler.TaskInfo
@@ -53,6 +54,8 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import java.time.ZoneOffset
 import java.util.{HashMap => JHashMap, Map => JMap}
 
+import scala.reflect.ClassTag
+
 class Spark34Shims extends SparkShims {
   override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
 
@@ -241,6 +244,10 @@ class Spark34Shims extends SparkShims {
     TaskContextUtils.createTestTaskContext()
   }
 
+  override def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): 
Broadcast[T] = {
+    SparkContextUtils.broadcastInternal(sc, value)
+  }
+
   def setJobDescriptionOrTagForBroadcastExchange(
       sc: SparkContext,
       broadcastExchange: BroadcastExchangeLike): Unit = {
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
 b/shims/spark34/src/main/scala/org/apache/spark/SparkContextUtils.scala
similarity index 74%
copy from 
gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
copy to shims/spark34/src/main/scala/org/apache/spark/SparkContextUtils.scala
index 481863354..3cbf2b602 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
+++ b/shims/spark34/src/main/scala/org/apache/spark/SparkContextUtils.scala
@@ -14,8 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.execution
+package org.apache.spark
 
-import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.broadcast.Broadcast
 
-class GlutenBroadcastExchangeSuite extends BroadcastExchangeSuite with 
GlutenSQLTestsBaseTrait {}
+import scala.reflect.ClassTag
+
+object SparkContextUtils {
+  def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T] 
= {
+    sc.broadcastInternal(value, serializedOnly = true)
+  }
+}
diff --git 
a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
index 93dce4fdc..1c87db39f 100644
--- 
a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala
@@ -20,7 +20,8 @@ import io.glutenproject.GlutenConfig
 import io.glutenproject.expression.{ExpressionNames, Sig}
 import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims}
 
-import org.apache.spark.{ShuffleUtils, SparkContext, SparkException, 
TaskContext, TaskContextUtils}
+import org.apache.spark.{ShuffleUtils, SparkContext, SparkContextUtils, 
SparkException, TaskContext, TaskContextUtils}
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.paths.SparkPath
 import org.apache.spark.scheduler.TaskInfo
@@ -52,6 +53,8 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 import java.time.ZoneOffset
 import java.util.{HashMap => JHashMap, Map => JMap}
 
+import scala.reflect.ClassTag
+
 class Spark35Shims extends SparkShims {
   override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
 
@@ -238,6 +241,10 @@ class Spark35Shims extends SparkShims {
     TaskContextUtils.createTestTaskContext()
   }
 
+  override def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): 
Broadcast[T] = {
+    SparkContextUtils.broadcastInternal(sc, value)
+  }
+
   override def setJobDescriptionOrTagForBroadcastExchange(
       sc: SparkContext,
       broadcastExchange: BroadcastExchangeLike): Unit = {
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
 b/shims/spark35/src/main/scala/org/apache/spark/SparkContextUtils.scala
similarity index 74%
copy from 
gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
copy to shims/spark35/src/main/scala/org/apache/spark/SparkContextUtils.scala
index 481863354..3cbf2b602 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
+++ b/shims/spark35/src/main/scala/org/apache/spark/SparkContextUtils.scala
@@ -14,8 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.execution
+package org.apache.spark
 
-import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.broadcast.Broadcast
 
-class GlutenBroadcastExchangeSuite extends BroadcastExchangeSuite with 
GlutenSQLTestsBaseTrait {}
+import scala.reflect.ClassTag
+
+object SparkContextUtils {
+  def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T] 
= {
+    sc.broadcastInternal(value, serializedOnly = true)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org

Reply via email to