[ 
https://issues.apache.org/jira/browse/SPARK-41129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roy updated SPARK-41129:
------------------------
    Attachment: image-2022-11-14-13-43-48-537.png

> When multiple SQLs are concurrent, the driver subquery thread is permanently 
> locked
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-41129
>                 URL: https://issues.apache.org/jira/browse/SPARK-41129
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.3.0
>            Reporter: Roy
>            Priority: Major
>         Attachments: image-2022-11-14-13-43-48-537.png, locksqlPNG.PNG, 
> normaljobs.PNG
>
>
> When a sql has only a small amount of concurrency (10), the sql will generate 
> 11 jobs, and can be executed smoothly.
> But when I increased the number of concurrency to 20, each sql only executed 
> the first job and stopped,
> And look at the driver thread dump and find that the subquery threads (20 
> threads) is locked, detail below
>  
> {code:java}
> Monitor(org.apache.spark.sql.execution.aggregate.HashAggregateExec@1335537910}),
>  Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1502413281}), 
> Monitor(org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec@890810300}),
>  Monitor(java.lang.Object@603970601}), 
> Monitor(org.apache.spark.sql.execution.exchange.ShuffleExchangeExec@2042514973})
>  {code}
> {code:java}
> sun.misc.Unsafe.park(Native Method) 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
> java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429) 
> java.util.concurrent.FutureTask.get(FutureTask.java:191) 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310) 
> org.apache.spark.sql.execution.SubqueryExec.executeCollect(basicPhysicalOperators.scala:861)
>  
> org.apache.spark.sql.execution.ScalarSubquery.updateResult(subquery.scala:80) 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$waitForSubqueries$1(SparkPlan.scala:262)
>  
> org.apache.spark.sql.execution.SparkPlan.$anonfun$waitForSubqueries$1$adapted(SparkPlan.scala:261)
>  
> org.apache.spark.sql.execution.SparkPlan$$Lambda$3650/586819338.apply(Unknown 
> Source) 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
> org.apache.spark.sql.execution.SparkPlan.waitForSubqueries(SparkPlan.scala:261)
>  => holding 
> Monitor(org.apache.spark.sql.execution.aggregate.HashAggregateExec@1335537910})
>  
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:231)
>  
> org.apache.spark.sql.execution.SparkPlan$$Lambda$3645/1297667696.apply(Unknown
>  Source) 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) 
> org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92)
>  
> org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92)
>  
> org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:47)
>  
> org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:660)
>  
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723)
>  
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
>  
> org.apache.spark.sql.execution.SparkPlan$$Lambda$3644/556844527.apply(Unknown 
> Source) 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
>  
> org.apache.spark.sql.execution.SparkPlan$$Lambda$3645/1297667696.apply(Unknown
>  Source) 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) 
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190) 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:135)
>  => holding 
> Monitor(org.apache.spark.sql.execution.exchange.ShuffleExchangeExec@2042514973})
>  
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:135)
>  
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:140)
>  => holding 
> Monitor(org.apache.spark.sql.execution.exchange.ShuffleExchangeExec@2042514973})
>  
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
>  
> org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
>  
> org.apache.spark.sql.execution.exchange.ShuffleExchangeLike$$Lambda$3671/828122256.apply(Unknown
>  Source) 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
>  
> org.apache.spark.sql.execution.SparkPlan$$Lambda$3645/1297667696.apply(Unknown
>  Source) 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
>  
> org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
>  
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115)
>  
> org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:174)
>  => holding 
> Monitor(org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec@890810300})
>  
> org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:174)
>  
> org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:176)
>  
> org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
>  
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:258)
>  
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:256)
>  
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$3669/118271447.apply(Unknown
>  Source) scala.collection.Iterator.foreach(Iterator.scala:943) 
> scala.collection.Iterator.foreach$(Iterator.scala:943) 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:256)
>  
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$3616/1041219151.apply(Unknown
>  Source) org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:228)
>  => holding Monitor(java.lang.Object@603970601}) 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:367)
>  
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeTake(AdaptiveSparkPlanExec.scala:344)
>  
> org.apache.spark.sql.execution.SubqueryExec.$anonfun$relationFuture$2(basicPhysicalOperators.scala:834)
>  
> org.apache.spark.sql.execution.SubqueryExec$$Lambda$3652/1074832567.apply(Unknown
>  Source) 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:145)
>  
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$3653/1322734277.apply(Unknown
>  Source) 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
>  
> org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:143)
>  
> org.apache.spark.sql.execution.SubqueryExec.$anonfun$relationFuture$1(basicPhysicalOperators.scala:830)
>  
> org.apache.spark.sql.execution.SubqueryExec$$Lambda$3648/502350376.apply(Unknown
>  Source) 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191)
>  
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$3649/2139778019.call(Unknown
>  Source) java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  java.lang.Thread.run(Thread.java:750)
> {code}
> Not sure what's causing it, please let me know if you need any info, thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to