This is an automated email from the ASF dual-hosted git repository. ulyssesyou pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push: new f52885898 [CORE] Port "SPARK-39983 Should not cache unserialized broadcast relations on the driver" (#5149) f52885898 is described below commit f52885898c20e99684503e37fe1db97a13244e11 Author: Xiduo You <ulyssesyo...@gmail.com> AuthorDate: Thu Mar 28 08:55:10 2024 +0800 [CORE] Port "SPARK-39983 Should not cache unserialized broadcast relations on the driver" (#5149) --- .../execution/ColumnarBroadcastExchangeExec.scala | 4 +- .../apache/spark/sql/GlutenSQLTestsBaseTrait.scala | 70 ++++++++++++---------- .../utils/velox/VeloxTestSettings.scala | 3 +- .../execution/GlutenBroadcastExchangeSuite.scala | 39 +++++++++++- .../io/glutenproject/sql/shims/SparkShims.scala | 10 ++++ .../sql/shims/spark34/Spark34Shims.scala | 9 ++- .../scala/org/apache/spark/SparkContextUtils.scala | 12 +++- .../sql/shims/spark35/Spark35Shims.scala | 9 ++- .../scala/org/apache/spark/SparkContextUtils.scala | 12 +++- 9 files changed, 124 insertions(+), 44 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala index 645bce76d..b90ff4967 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala @@ -80,7 +80,9 @@ case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) val broadcasted = GlutenTimeMetric.millis(longMetric("broadcastTime")) { _ => // Broadcast the relation - sparkContext.broadcast(relation.asInstanceOf[Any]) + SparkShimLoader.getSparkShims.broadcastInternal( + sparkContext, + relation.asInstanceOf[Any]) } // Update driver metrics diff --git a/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala b/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala index 68fa08879..5cd9f3e9c 100644 --- a/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala +++ b/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala @@ -45,39 +45,7 @@ trait GlutenSQLTestsBaseTrait extends SharedSparkSession with GlutenTestsBaseTra } override def sparkConf: SparkConf = { - // Native SQL configs - val conf = super.sparkConf - .setAppName("Gluten-UT") - .set("spark.driver.memory", "1G") - .set("spark.sql.adaptive.enabled", "true") - .set("spark.sql.shuffle.partitions", "1") - .set("spark.sql.files.maxPartitionBytes", "134217728") - .set("spark.memory.offHeap.enabled", "true") - .set("spark.memory.offHeap.size", "1024MB") - .set("spark.plugins", "io.glutenproject.GlutenPlugin") - .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") - .set("spark.sql.warehouse.dir", warehouse) - .set("spark.ui.enabled", "false") - .set("spark.gluten.ui.enabled", "false") - // Avoid static evaluation by spark catalyst. But there are some UT issues - // coming from spark, e.g., expecting SparkException is thrown, but the wrapped - // exception is thrown. - // .set("spark.sql.optimizer.excludedRules", ConstantFolding.ruleName + "," + - // NullPropagation.ruleName) - - if (BackendTestUtils.isCHBackendLoaded()) { - conf - .set("spark.io.compression.codec", "LZ4") - .set("spark.gluten.sql.columnar.backend.ch.worker.id", "1") - .set("spark.gluten.sql.enable.native.validation", "false") - .set(GlutenConfig.GLUTEN_LIB_PATH, SystemParameters.getClickHouseLibPath) - .set("spark.sql.files.openCostInBytes", "134217728") - .set("spark.unsafe.exceptionOnMemoryLeak", "true") - } else { - conf.set("spark.unsafe.exceptionOnMemoryLeak", "true") - } - - conf + GlutenSQLTestsBaseTrait.nativeSparkConf(super.sparkConf, warehouse) } /** @@ -126,3 +94,39 @@ trait GlutenSQLTestsBaseTrait extends SharedSparkSession with GlutenTestsBaseTra } } } + +object GlutenSQLTestsBaseTrait { + def nativeSparkConf(origin: SparkConf, warehouse: String): SparkConf = { + // Native SQL configs + val conf = origin + .setAppName("Gluten-UT") + .set("spark.driver.memory", "1G") + .set("spark.sql.adaptive.enabled", "true") + .set("spark.sql.shuffle.partitions", "1") + .set("spark.sql.files.maxPartitionBytes", "134217728") + .set("spark.memory.offHeap.enabled", "true") + .set("spark.memory.offHeap.size", "1024MB") + .set("spark.plugins", "io.glutenproject.GlutenPlugin") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.sql.warehouse.dir", warehouse) + .set("spark.ui.enabled", "false") + .set("spark.gluten.ui.enabled", "false") + // Avoid static evaluation by spark catalyst. But there are some UT issues + // coming from spark, e.g., expecting SparkException is thrown, but the wrapped + // exception is thrown. + // .set("spark.sql.optimizer.excludedRules", ConstantFolding.ruleName + "," + + // NullPropagation.ruleName) + + if (BackendTestUtils.isCHBackendLoaded()) { + conf + .set("spark.io.compression.codec", "LZ4") + .set("spark.gluten.sql.columnar.backend.ch.worker.id", "1") + .set("spark.gluten.sql.enable.native.validation", "false") + .set(GlutenConfig.GLUTEN_LIB_PATH, SystemParameters.getClickHouseLibPath) + .set("spark.sql.files.openCostInBytes", "134217728") + .set("spark.unsafe.exceptionOnMemoryLeak", "true") + } else { + conf.set("spark.unsafe.exceptionOnMemoryLeak", "true") + } + } +} diff --git a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala index 852ba27f2..84fc83083 100644 --- a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/velox/VeloxTestSettings.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{GlutenArithmeticExpressionSuite, GlutenBitwiseExpressionsSuite, GlutenCastSuite, GlutenCollectionExpressionsSuite, GlutenComplexTypeSuite, GlutenConditionalExpressionSuite, GlutenDateExpressionsSuite, GlutenDecimalExpressionSuite, GlutenHashExpressionsSuite, GlutenIntervalExpressionsSuite, GlutenLiteralExpressionSuite, GlutenMathExpressionsSuite, GlutenMiscExpressionsSuite, GlutenNondeterministicSuite, GlutenNullExpressionsSuite, GlutenPr [...] import org.apache.spark.sql.connector.{GlutenDataSourceV2DataFrameSessionCatalogSuite, GlutenDataSourceV2DataFrameSuite, GlutenDataSourceV2FunctionSuite, GlutenDataSourceV2SQLSessionCatalogSuite, GlutenDataSourceV2SQLSuiteV1Filter, GlutenDataSourceV2SQLSuiteV2Filter, GlutenDataSourceV2Suite, GlutenDeleteFromTableSuite, GlutenFileDataSourceV2FallBackSuite, GlutenKeyGroupedPartitioningSuite, GlutenLocalScanSuite, GlutenMetadataColumnSuite, GlutenSupportsCatalogOptionsSuite, GlutenTableCapa [...] import org.apache.spark.sql.errors.{GlutenQueryCompilationErrorsDSv2Suite, GlutenQueryCompilationErrorsSuite, GlutenQueryExecutionErrorsSuite, GlutenQueryParsingErrorsSuite} -import org.apache.spark.sql.execution.{FallbackStrategiesSuite, GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, GlutenExchangeSuite, GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, GlutenSameResultSuite, GlutenSortSuite, GlutenSQLAggregateFunctionSuite, GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite} +import org.apache.spark.sql.execution.{FallbackStrategiesSuite, GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, GlutenExchangeSuite, GlutenLocalBroadcastExchangeSuite, GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, GlutenSameResultSuite, GlutenSortSuite, GlutenSQLAggregateFunctionSuite, GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite} import org.apache.spark.sql.execution.adaptive.velox.VeloxAdaptiveQueryExecSuite import org.apache.spark.sql.execution.datasources.{GlutenBucketingUtilsSuite, GlutenCSVReadSchemaSuite, GlutenDataSourceStrategySuite, GlutenDataSourceSuite, GlutenFileFormatWriterSuite, GlutenFileIndexSuite, GlutenFileMetadataStructSuite, GlutenFileSourceStrategySuite, GlutenHadoopFileLinesReaderSuite, GlutenHeaderCSVReadSchemaSuite, GlutenJsonReadSchemaSuite, GlutenMergedOrcReadSchemaSuite, GlutenMergedParquetReadSchemaSuite, GlutenOrcCodecSuite, GlutenOrcReadSchemaSuite, GlutenOrcV1Ag [...] import org.apache.spark.sql.execution.datasources.binaryfile.GlutenBinaryFileFormatSuite @@ -827,6 +827,7 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenOuterJoinSuite] enableSuite[FallbackStrategiesSuite] enableSuite[GlutenBroadcastExchangeSuite] + enableSuite[GlutenLocalBroadcastExchangeSuite] enableSuite[GlutenCoalesceShufflePartitionsSuite] .excludeByPrefix("determining the number of reducers") enableSuite[GlutenExchangeSuite] diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala index 481863354..0f953b6d6 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala @@ -16,6 +16,43 @@ */ package org.apache.spark.sql.execution -import org.apache.spark.sql.GlutenSQLTestsBaseTrait +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.broadcast.TorrentBroadcast +import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, GlutenTestsBaseTrait, SparkSession} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions.broadcast class GlutenBroadcastExchangeSuite extends BroadcastExchangeSuite with GlutenSQLTestsBaseTrait {} + +// Additional tests run in 'local-cluster' mode. +class GlutenLocalBroadcastExchangeSuite + extends SparkFunSuite + with LocalSparkContext + with GlutenTestsBaseTrait + with AdaptiveSparkPlanHelper { + + def newSparkConf(): SparkConf = { + val conf = new SparkConf().setMaster("local-cluster[2,1,1024]") + GlutenSQLTestsBaseTrait.nativeSparkConf(conf, warehouse) + } + + test("SPARK-39983 - Broadcasted relation is not cached on the driver") { + // Use distributed cluster as in local mode the broabcast value is actually cached. + val conf = newSparkConf() + sc = new SparkContext(conf) + val spark = new SparkSession(sc) + + val df = spark.range(1).toDF() + val joinDF = df.join(broadcast(df), "id") + joinDF.collect() + val broadcastExchangeExec = collect(joinDF.queryExecution.executedPlan) { + case p: ColumnarBroadcastExchangeExec => p + } + assert(broadcastExchangeExec.size == 1, "one and only ColumnarBroadcastExchangeExec") + + // The broadcasted relation should not be cached on the driver. + val broadcasted = + broadcastExchangeExec(0).relationFuture.get().asInstanceOf[TorrentBroadcast[Any]] + assert(!broadcasted.hasCachedValue) + } +} diff --git a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala index df1f3e451..7d71346a7 100644 --- a/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala +++ b/shims/common/src/main/scala/io/glutenproject/sql/shims/SparkShims.scala @@ -19,6 +19,7 @@ package io.glutenproject.sql.shims import io.glutenproject.expression.Sig import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.scheduler.TaskInfo import org.apache.spark.shuffle.ShuffleHandle @@ -46,6 +47,8 @@ import org.apache.hadoop.fs.{FileStatus, Path} import java.util.{ArrayList => JArrayList, Map => JMap} +import scala.reflect.ClassTag + sealed abstract class ShimDescriptor case class SparkShimDescriptor(major: Int, minor: Int, patch: Int) extends ShimDescriptor { @@ -123,6 +126,13 @@ trait SparkShims { def createTestTaskContext(): TaskContext + def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T] = { + // Since Spark 3.4, the `sc.broadcast` has been optimized to use `sc.broadcastInternal`. + // More details see SPARK-39983. + // TODO, remove this shim once we drop Spark3.3 and previous + sc.broadcast(value) + } + // To be compatible with Spark-3.5 and later // See https://github.com/apache/spark/pull/41440 def setJobDescriptionOrTagForBroadcastExchange( diff --git a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala index 027e88cbc..2732c190f 100644 --- a/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala +++ b/shims/spark34/src/main/scala/io/glutenproject/sql/shims/spark34/Spark34Shims.scala @@ -20,7 +20,8 @@ import io.glutenproject.GlutenConfig import io.glutenproject.expression.{ExpressionNames, Sig} import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims} -import org.apache.spark.{ShuffleUtils, SparkContext, SparkException, TaskContext, TaskContextUtils} +import org.apache.spark.{ShuffleUtils, SparkContext, SparkContextUtils, SparkException, TaskContext, TaskContextUtils} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.paths.SparkPath import org.apache.spark.scheduler.TaskInfo @@ -53,6 +54,8 @@ import org.apache.hadoop.fs.{FileStatus, Path} import java.time.ZoneOffset import java.util.{HashMap => JHashMap, Map => JMap} +import scala.reflect.ClassTag + class Spark34Shims extends SparkShims { override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR @@ -241,6 +244,10 @@ class Spark34Shims extends SparkShims { TaskContextUtils.createTestTaskContext() } + override def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T] = { + SparkContextUtils.broadcastInternal(sc, value) + } + def setJobDescriptionOrTagForBroadcastExchange( sc: SparkContext, broadcastExchange: BroadcastExchangeLike): Unit = { diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala b/shims/spark34/src/main/scala/org/apache/spark/SparkContextUtils.scala similarity index 74% copy from gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala copy to shims/spark34/src/main/scala/org/apache/spark/SparkContextUtils.scala index 481863354..3cbf2b602 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala +++ b/shims/spark34/src/main/scala/org/apache/spark/SparkContextUtils.scala @@ -14,8 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.sql.execution +package org.apache.spark -import org.apache.spark.sql.GlutenSQLTestsBaseTrait +import org.apache.spark.broadcast.Broadcast -class GlutenBroadcastExchangeSuite extends BroadcastExchangeSuite with GlutenSQLTestsBaseTrait {} +import scala.reflect.ClassTag + +object SparkContextUtils { + def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T] = { + sc.broadcastInternal(value, serializedOnly = true) + } +} diff --git a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala index 93dce4fdc..1c87db39f 100644 --- a/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/io/glutenproject/sql/shims/spark35/Spark35Shims.scala @@ -20,7 +20,8 @@ import io.glutenproject.GlutenConfig import io.glutenproject.expression.{ExpressionNames, Sig} import io.glutenproject.sql.shims.{ShimDescriptor, SparkShims} -import org.apache.spark.{ShuffleUtils, SparkContext, SparkException, TaskContext, TaskContextUtils} +import org.apache.spark.{ShuffleUtils, SparkContext, SparkContextUtils, SparkException, TaskContext, TaskContextUtils} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.paths.SparkPath import org.apache.spark.scheduler.TaskInfo @@ -52,6 +53,8 @@ import org.apache.hadoop.fs.{FileStatus, Path} import java.time.ZoneOffset import java.util.{HashMap => JHashMap, Map => JMap} +import scala.reflect.ClassTag + class Spark35Shims extends SparkShims { override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR @@ -238,6 +241,10 @@ class Spark35Shims extends SparkShims { TaskContextUtils.createTestTaskContext() } + override def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T] = { + SparkContextUtils.broadcastInternal(sc, value) + } + override def setJobDescriptionOrTagForBroadcastExchange( sc: SparkContext, broadcastExchange: BroadcastExchangeLike): Unit = { diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala b/shims/spark35/src/main/scala/org/apache/spark/SparkContextUtils.scala similarity index 74% copy from gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala copy to shims/spark35/src/main/scala/org/apache/spark/SparkContextUtils.scala index 481863354..3cbf2b602 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala +++ b/shims/spark35/src/main/scala/org/apache/spark/SparkContextUtils.scala @@ -14,8 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.sql.execution +package org.apache.spark -import org.apache.spark.sql.GlutenSQLTestsBaseTrait +import org.apache.spark.broadcast.Broadcast -class GlutenBroadcastExchangeSuite extends BroadcastExchangeSuite with GlutenSQLTestsBaseTrait {} +import scala.reflect.ClassTag + +object SparkContextUtils { + def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T] = { + sc.broadcastInternal(value, serializedOnly = true) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org For additional commands, e-mail: commits-h...@gluten.apache.org