[jira] [Updated] (SPARK-35414) Completely fix the broadcast timeout issue in AQE
[ https://issues.apache.org/jira/browse/SPARK-35414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yu Zhong updated SPARK-35414: - Fix Version/s: (was: 3.2.0) > Completely fix the broadcast timeout issue in AQE > - > > Key: SPARK-35414 > URL: https://issues.apache.org/jira/browse/SPARK-35414 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0, 3.0.1 >Reporter: Yu Zhong >Assignee: Yu Zhong >Priority: Major > > SPARK-33933 report a issue that in AQE, when the resources is limited, > broadcast timeout could happened. > [#31269|https://github.com/apache/spark/pull/31269] gives a partial fix by > reorder newStages by class type to make sure BroadcastQueryState precede > others when calling materialized(). However, it only guarantee that the order > of task to be scheduled in normal circumstances, but, the guarantee is not > strict since the submit of broadcast job and shuffle map job are in different > thread. > So we need a completely fix to avoid the edge case triggering broadcast > timeout. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35414) Completely fix the broadcast timeout issue in AQE
[ https://issues.apache.org/jira/browse/SPARK-35414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yu Zhong updated SPARK-35414: - Description: SPARK-33933 report a issue that in AQE, when the resources is limited, broadcast timeout could happened. [#31269|https://github.com/apache/spark/pull/31269] gives a partial fix by reorder newStages by class type to make sure BroadcastQueryState precede others when calling materialized(). However, it only guarantee that the order of task to be scheduled in normal circumstances, but, the guarantee is not strict since the submit of broadcast job and shuffle map job are in different thread. So we need a completely fix to avoid the edge case triggering broadcast timeout. was: In Spark 3.0, when AQE is enabled, there is often broadcast timeout in normal queries as below. {code:java} Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1 {code} This is usually happens when broadcast join(with or without hint) after a long running shuffle (more than 5 minutes). By disable AQE, the issues disappear. The workaround is to increase spark.sql.broadcastTimeout and it works. But because the data to broadcast is very small, that doesn't make sense. After investigation, the root cause should be like this: when enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map job and broadcast job are submitted almost at the same time, but map job will hold all the computing resources. If the map job runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475). Code to reproduce: {code:java} import java.util.UUID import scala.util.Random import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local[2]") .appName("Test Broadcast").getOrCreate() import spark.implicits._ spark.conf.set("spark.sql.adaptive.enabled", "true") val sc = spark.sparkContext sc.setLogLevel("INFO") val uuid = UUID.randomUUID val df = sc.parallelize(Range(0, 1), 1).flatMap(x => { for (i <- Range(0, 1 + Random.nextInt(1))) yield (x % 26, x, Random.nextInt(10), UUID.randomUUID.toString) }).toDF("index", "part", "pv", "uuid") .withColumn("md5", md5($"uuid")) val dim_data = Range(0, 26).map(x => (('a' + x).toChar.toString, x)) val dim = dim_data.toDF("name", "index") val result = df.groupBy("index") .agg(sum($"pv").alias("pv"), countDistinct("uuid").alias("uv")) .join(dim, Seq("index")) .collect(){code} > Completely fix the broadcast timeout issue in AQE > - > > Key: SPARK-35414 > URL: https://issues.apache.org/jira/browse/SPARK-35414 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0, 3.0.1 >Reporter: Yu Zhong >Assignee: Yu Zhong >Priority: Major > Fix For: 3.2.0 > > > SPARK-33933 report a issue that in AQE, when the resources is limited, > broadcast timeout could happened. > [#31269|https://github.com/apache/spark/pull/31269] gives a partial fix by > reorder newStages by class type to make sure BroadcastQueryState precede > others when calling materialized(). However, it only guarantee that the order > of task to be scheduled in normal circumstances, but, the guarantee is not > strict since the submit of broadcast job and shuffle map job are in different > thread. > So we need a completely fix to avoid the edge case triggering broadcast > timeout. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35414) Completely fix the broadcast timeout issue in AQE
Yu Zhong created SPARK-35414: Summary: Completely fix the broadcast timeout issue in AQE Key: SPARK-35414 URL: https://issues.apache.org/jira/browse/SPARK-35414 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.0.0, 3.0.1 Reporter: Yu Zhong Assignee: Yu Zhong Fix For: 3.2.0 In Spark 3.0, when AQE is enabled, there is often broadcast timeout in normal queries as below. {code:java} Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1 {code} This is usually happens when broadcast join(with or without hint) after a long running shuffle (more than 5 minutes). By disable AQE, the issues disappear. The workaround is to increase spark.sql.broadcastTimeout and it works. But because the data to broadcast is very small, that doesn't make sense. After investigation, the root cause should be like this: when enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map job and broadcast job are submitted almost at the same time, but map job will hold all the computing resources. If the map job runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475). Code to reproduce: {code:java} import java.util.UUID import scala.util.Random import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local[2]") .appName("Test Broadcast").getOrCreate() import spark.implicits._ spark.conf.set("spark.sql.adaptive.enabled", "true") val sc = spark.sparkContext sc.setLogLevel("INFO") val uuid = UUID.randomUUID val df = sc.parallelize(Range(0, 1), 1).flatMap(x => { for (i <- Range(0, 1 + Random.nextInt(1))) yield (x % 26, x, Random.nextInt(10), UUID.randomUUID.toString) }).toDF("index", "part", "pv", "uuid") .withColumn("md5", md5($"uuid")) val dim_data = Range(0, 26).map(x => (('a' + x).toChar.toString, x)) val dim = dim_data.toDF("name", "index") val result = df.groupBy("index") .agg(sum($"pv").alias("pv"), countDistinct("uuid").alias("uv")) .join(dim, Seq("index")) .collect(){code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33933) Broadcast timeout happened unexpectedly in AQE
[ https://issues.apache.org/jira/browse/SPARK-33933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17257740#comment-17257740 ] Yu Zhong commented on SPARK-33933: -- Hi [~dongjoon], I close PR #[30962|https://github.com/apache/spark/pull/30962] and create another PR [#30998|https://github.com/apache/spark/pull/30998] based on master, sorry for inconvenience. > Broadcast timeout happened unexpectedly in AQE > --- > > Key: SPARK-33933 > URL: https://issues.apache.org/jira/browse/SPARK-33933 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0, 3.0.1 >Reporter: Yu Zhong >Priority: Major > > In Spark 3.0, when AQE is enabled, there is often broadcast timeout in normal > queries as below. > > {code:java} > Could not execute broadcast in 300 secs. You can increase the timeout for > broadcasts via spark.sql.broadcastTimeout or disable broadcast join by > setting spark.sql.autoBroadcastJoinThreshold to -1 > {code} > > This is usually happens when broadcast join(with or without hint) after a > long running shuffle (more than 5 minutes). By disable AQE, the issues > disappear. > The workaround is to increase spark.sql.broadcastTimeout and it works. But > because the data to broadcast is very small, that doesn't make sense. > After investigation, the root cause should be like this: when enable AQE, in > getFinalPhysicalPlan, spark traversal the physical plan bottom up and create > query stage for materialized part by createQueryStages and materialize those > new created query stages to submit map stages or broadcasting. When > ShuffleQueryStage are materializing before BroadcastQueryStage, the map job > and broadcast job are submitted almost at the same time, but map job will > hold all the computing resources. If the map job runs slow (when lots of data > needs to process and the resource is limited), the broadcast job cannot be > started(and finished) before spark.sql.broadcastTimeout, thus cause whole job > failed (introduced in SPARK-31475). > Code to reproduce: > > {code:java} > import java.util.UUID > import scala.util.Random > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.SparkSession > val spark = SparkSession.builder() > .master("local[2]") > .appName("Test Broadcast").getOrCreate() > import spark.implicits._ > spark.conf.set("spark.sql.adaptive.enabled", "true") > val sc = spark.sparkContext > sc.setLogLevel("INFO") > val uuid = UUID.randomUUID > val df = sc.parallelize(Range(0, 1), 1).flatMap(x => { > for (i <- Range(0, 1 + Random.nextInt(1))) > yield (x % 26, x, Random.nextInt(10), UUID.randomUUID.toString) > }).toDF("index", "part", "pv", "uuid") > .withColumn("md5", md5($"uuid")) > val dim_data = Range(0, 26).map(x => (('a' + x).toChar.toString, x)) > val dim = dim_data.toDF("name", "index") > val result = df.groupBy("index") > .agg(sum($"pv").alias("pv"), countDistinct("uuid").alias("uv")) > .join(dim, Seq("index")) > .collect(){code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33933) Broadcast timeout happened unexpectedly in AQE
[ https://issues.apache.org/jira/browse/SPARK-33933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yu Zhong updated SPARK-33933: - Description: In Spark 3.0, when AQE is enabled, there is often broadcast timeout in normal queries as below. {code:java} Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1 {code} This is usually happens when broadcast join(with or without hint) after a long running shuffle (more than 5 minutes). By disable AQE, the issues disappear. The workaround is to increase spark.sql.broadcastTimeout and it works. But because the data to broadcast is very small, that doesn't make sense. After investigation, the root cause should be like this: when enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map job and broadcast job are submitted almost at the same time, but map job will hold all the computing resources. If the map job runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475). Code to reproduce: {code:java} import java.util.UUID import scala.util.Random import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local[2]") .appName("Test Broadcast").getOrCreate() import spark.implicits._ spark.conf.set("spark.sql.adaptive.enabled", "true") val sc = spark.sparkContext sc.setLogLevel("INFO") val uuid = UUID.randomUUID val df = sc.parallelize(Range(0, 1), 1).flatMap(x => { for (i <- Range(0, 1 + Random.nextInt(1))) yield (x % 26, x, Random.nextInt(10), UUID.randomUUID.toString) }).toDF("index", "part", "pv", "uuid") .withColumn("md5", md5($"uuid")) val dim_data = Range(0, 26).map(x => (('a' + x).toChar.toString, x)) val dim = dim_data.toDF("name", "index") val result = df.groupBy("index") .agg(sum($"pv").alias("pv"), countDistinct("uuid").alias("uv")) .join(dim, Seq("index")) .collect(){code} was: In Spark 3.0, when AQE is enabled, there is often broadcast timeout in normal queries as below. {code:java} Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1 {code} This is usually happens when broadcast join(with or without hint) after a long running shuffle (more than 5 minutes). By disable AQE, the issues disappear. The workaround is to increase spark.sql.broadcastTimeout and it works. But because the data to broadcast is very small, that doesn't make sense. After investigation, the root cause should be like this: when enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map job and broadcast job are submitted almost at the same time, but map job will hold all the computing resources. If the map job runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475). Code to reproduce: {code:java} import java.util.UUID import scala.util.Random import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local[2]") .appName("Test Broadcast").getOrCreate() import spark.implicits._ val sc = spark.sparkContext sc.setLogLevel("INFO") val uuid = UUID.randomUUID val df = sc.parallelize(Range(0, 1), 1).flatMap(x => { for (i <- Range(0, 1 + Random.nextInt(1))) yield (x % 26, x, Random.nextInt(10), UUID.randomUUID.toString) }).toDF("index", "part", "pv", "uuid") .withColumn("md5", md5($"uuid")) val dim_data = Range(0, 26).map(x => (('a' + x).toChar.toString, x)) val dim = dim_data.toDF("name", "index") val result = df.groupBy("index") .agg(sum($"pv").alias("pv"), countDistinct("uuid").alias("uv")) .join(dim, Seq("index")) .collect(){code} > Broadcast timeout happened unexpectedly in AQE > --- > > Key: SPARK-33933 > URL: https://issues.apache.org/jira/browse/SPARK-33933 > Project: Spark >
[jira] [Created] (SPARK-33933) Broadcast timeout happened unexpectedly in AQE
Yu Zhong created SPARK-33933: Summary: Broadcast timeout happened unexpectedly in AQE Key: SPARK-33933 URL: https://issues.apache.org/jira/browse/SPARK-33933 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.1, 3.0.0 Reporter: Yu Zhong In Spark 3.0, when AQE is enabled, there is often broadcast timeout in normal queries as below. {code:java} Could not execute broadcast in 300 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1 {code} This is usually happens when broadcast join(with or without hint) after a long running shuffle (more than 5 minutes). By disable AQE, the issues disappear. The workaround is to increase spark.sql.broadcastTimeout and it works. But because the data to broadcast is very small, that doesn't make sense. After investigation, the root cause should be like this: when enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map job and broadcast job are submitted almost at the same time, but map job will hold all the computing resources. If the map job runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475). Code to reproduce: {code:java} import java.util.UUID import scala.util.Random import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local[2]") .appName("Test Broadcast").getOrCreate() import spark.implicits._ val sc = spark.sparkContext sc.setLogLevel("INFO") val uuid = UUID.randomUUID val df = sc.parallelize(Range(0, 1), 1).flatMap(x => { for (i <- Range(0, 1 + Random.nextInt(1))) yield (x % 26, x, Random.nextInt(10), UUID.randomUUID.toString) }).toDF("index", "part", "pv", "uuid") .withColumn("md5", md5($"uuid")) val dim_data = Range(0, 26).map(x => (('a' + x).toChar.toString, x)) val dim = dim_data.toDF("name", "index") val result = df.groupBy("index") .agg(sum($"pv").alias("pv"), countDistinct("uuid").alias("uv")) .join(dim, Seq("index")) .collect(){code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org