Yu Zhong created SPARK-33933:
--------------------------------

             Summary: Broadcast timeout happened unexpectedly in AQE 
                 Key: SPARK-33933
                 URL: https://issues.apache.org/jira/browse/SPARK-33933
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.0.1, 3.0.0
            Reporter: Yu Zhong


In Spark 3.0, when AQE is enabled, there is often broadcast timeout in normal 
queries as below.

 
{code:java}
Could not execute broadcast in 300 secs. You can increase the timeout for 
broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting 
spark.sql.autoBroadcastJoinThreshold to -1
{code}
 

This is usually happens when broadcast join(with or without hint) after a long 
running shuffle (more than 5 minutes).  By disable AQE, the issues disappear.

The workaround is to increase spark.sql.broadcastTimeout and it works. But 
because the data to broadcast is very small, that doesn't make sense.

After investigation, the root cause should be like this: when enable AQE, in 
getFinalPhysicalPlan, spark traversal the physical plan bottom up and create 
query stage for materialized part by createQueryStages and materialize those 
new created query stages to submit map stages or broadcasting. When 
ShuffleQueryStage are materializing before BroadcastQueryStage, the map job and 
broadcast job are submitted almost at the same time, but map job will hold all 
the computing resources. If the map job runs slow (when lots of data needs to 
process and the resource is limited), the broadcast job cannot be started(and 
finished) before spark.sql.broadcastTimeout, thus cause whole job failed 
(introduced in SPARK-31475).

Code to reproduce:

 
{code:java}
import java.util.UUID
import scala.util.Random
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .master("local[2]")
  .appName("Test Broadcast").getOrCreate()
import spark.implicits._

val sc = spark.sparkContext
sc.setLogLevel("INFO")
val uuid = UUID.randomUUID

val df = sc.parallelize(Range(0, 10000), 10000).flatMap(x => {
  for (i <- Range(0, 10000 + Random.nextInt(10000)))
    yield (x % 26, x, Random.nextInt(10), UUID.randomUUID.toString)
}).toDF("index", "part", "pv", "uuid")
  .withColumn("md5", md5($"uuid"))

val dim_data = Range(0, 26).map(x => (('a' + x).toChar.toString, x))
val dim = dim_data.toDF("name", "index")

val result = df.groupBy("index")
  .agg(sum($"pv").alias("pv"), countDistinct("uuid").alias("uv"))
  .join(dim, Seq("index"))
  .collect(){code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to