[ https://issues.apache.org/jira/browse/SPARK-33933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-33933: ------------------------------------ Assignee: (was: Apache Spark) > Broadcast timeout happened unexpectedly in AQE > ----------------------------------------------- > > Key: SPARK-33933 > URL: https://issues.apache.org/jira/browse/SPARK-33933 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.0.0, 3.0.1 > Reporter: Yu Zhong > Priority: Major > > In Spark 3.0, when AQE is enabled, there is often broadcast timeout in normal > queries as below. > > {code:java} > Could not execute broadcast in 300 secs. You can increase the timeout for > broadcasts via spark.sql.broadcastTimeout or disable broadcast join by > setting spark.sql.autoBroadcastJoinThreshold to -1 > {code} > > This is usually happens when broadcast join(with or without hint) after a > long running shuffle (more than 5 minutes). By disable AQE, the issues > disappear. > The workaround is to increase spark.sql.broadcastTimeout and it works. But > because the data to broadcast is very small, that doesn't make sense. > After investigation, the root cause should be like this: when enable AQE, in > getFinalPhysicalPlan, spark traversal the physical plan bottom up and create > query stage for materialized part by createQueryStages and materialize those > new created query stages to submit map stages or broadcasting. When > ShuffleQueryStage are materializing before BroadcastQueryStage, the map job > and broadcast job are submitted almost at the same time, but map job will > hold all the computing resources. If the map job runs slow (when lots of data > needs to process and the resource is limited), the broadcast job cannot be > started(and finished) before spark.sql.broadcastTimeout, thus cause whole job > failed (introduced in SPARK-31475). > Code to reproduce: > > {code:java} > import java.util.UUID > import scala.util.Random > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.SparkSession > val spark = SparkSession.builder() > .master("local[2]") > .appName("Test Broadcast").getOrCreate() > import spark.implicits._ > spark.conf.set("spark.sql.adaptive.enabled", "true") > val sc = spark.sparkContext > sc.setLogLevel("INFO") > val uuid = UUID.randomUUID > val df = sc.parallelize(Range(0, 10000), 10000).flatMap(x => { > for (i <- Range(0, 10000 + Random.nextInt(10000))) > yield (x % 26, x, Random.nextInt(10), UUID.randomUUID.toString) > }).toDF("index", "part", "pv", "uuid") > .withColumn("md5", md5($"uuid")) > val dim_data = Range(0, 26).map(x => (('a' + x).toChar.toString, x)) > val dim = dim_data.toDF("name", "index") > val result = df.groupBy("index") > .agg(sum($"pv").alias("pv"), countDistinct("uuid").alias("uv")) > .join(dim, Seq("index")) > .collect(){code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org