[ 
https://issues.apache.org/jira/browse/SPARK-36901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601030#comment-17601030
 ] 

Igor Uchôa edited comment on SPARK-36901 at 9/6/22 10:06 PM:
-------------------------------------------------------------

I'm facing this too. In my case, we have a shared cluster where multiple 
applications run on it at the same time and they use Dynamic Allocation. 
Depending on the cluster load, the application gets a broadcast timeout error. 
In my opinion, resource allocation delays shouldn't cause errors in the 
application, especially regarding broadcast, which seems unrelated to the 
problem at first glance.

The only parameter we have to fix this issue is `spark.sql.broadcastTimeout` 
which affects all broadcast operations. Therefore, if we give this value a huge 
number (or infinite), we will solve the broadcast timeout issue, but we will 
allow users to explicitly broadcast huge data sets in their queries, which 
seems really bad from my perspective.

Since the broadcast exchange is a job, it will trigger the lazy evaluation, and 
depending on resource availability, it will start immediately, or in the worst 
case, it will wait until the application has enough executors. The problem is 
that the timer starts to count the moment the job is created and not at the 
moment it started the first task. So if we have a delay in the resource 
allocation, the application will eventually fire the broadcast timeout error: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala#L211

My suggestion is to change the timer approach and make it starts only if the 
application has at least one running task. In this way we will be able to use 
`spark.sql.broadcastTimeout` properly. Please, let me know your thoughts


was (Author: JIRAUSER293801):
I'm facing this too. In my case, we have a shared cluster where multiple 
applications run on it at the same time and they use Dynamic Allocation. 
Depending on the cluster load, the application gets a broadcast timeout error. 
In my opinion, resource allocation delays shouldn't cause errors in the 
application, especially regarding broadcast, which seems unrelated to the 
problem at first glance.

The only parameter we have to fix this issue is `spark.sql.broadcastTimeout` 
which affects all broadcast operations. Therefore, if we give this value a huge 
number (or infinite), we will solve the broadcast timeout issue, but we will 
allow users to explicitly broadcast huge data sets in their queries, which 
seems really bad from my perspective.

Since the broadcast exchange is a job, it will trigger the lazy evaluation, and 
depending on resource availability, it will start immediately, or in the worst 
case, it will wait until the application has enough executors. The problem is 
that the timer starts to count the moment the job is created and not at the 
moment it started the first task. So if we have a delay in the resource 
allocation, the application will eventually fire the broadcast timeout error: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala#L211

My suggestion is to change the timer approach and make it starts only if the 
application has at least one running task. In this way we will be able to use 
`spark.sql.broadcastTimeout` properly.

> ERROR exchange.BroadcastExchangeExec: Could not execute broadcast in 300 secs
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-36901
>                 URL: https://issues.apache.org/jira/browse/SPARK-36901
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 2.4.0
>            Reporter: Ranga Reddy
>            Priority: Major
>
> While running Spark application, if there are no further resources to launch 
> executors, Spark application is failed after 5 mins with below exception.
> {code:java}
> 21/09/24 06:12:45 WARN cluster.YarnScheduler: Initial job has not accepted 
> any resources; check your cluster UI to ensure that workers are registered 
> and have sufficient resources
> ...
> 21/09/24 06:17:29 ERROR exchange.BroadcastExchangeExec: Could not execute 
> broadcast in 300 secs.
> java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
> ...
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
> [300 seconds]
>       at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
>       at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
>       at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)
>       at 
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:146)
>       ... 71 more
> 21/09/24 06:17:30 INFO spark.SparkContext: Invoking stop() from shutdown hook
> {code}
> *Expectation* should be either needs to be throw proper exception saying 
> *"there are no further resources to run the application"* or it needs to be 
> *"wait till it get resources"*.
> To reproduce the issue we have used following sample code.
> *PySpark Code (test_broadcast_timeout.py):*
> {code:java}
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.appName("Test Broadcast Timeout").getOrCreate()
> t1 = spark.range(5)
> t2 = spark.range(5)
> q = t1.join(t2,t1.id == t2.id)
> q.explain
> q.show(){code}
> *Spark Submit Command:*
> {code:java}
> spark-submit --executor-memory 512M test_broadcast_timeout.py{code}
>  Note: We have tested same code in Spark 3.1, we are able to reproduce the 
> issue in Spark3 as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to