[ https://issues.apache.org/jira/browse/SPARK-41129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17754048#comment-17754048 ]
JacobZheng commented on SPARK-41129: ------------------------------------ I'm experiencing the exact same thing, not sure if it has anything to do with concurrency. Did you find a cause or solution to the problem? > When multiple SQLs are concurrent, the driver subquery thread is permanently > locked > ----------------------------------------------------------------------------------- > > Key: SPARK-41129 > URL: https://issues.apache.org/jira/browse/SPARK-41129 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.3.0 > Reporter: Roy > Priority: Major > Attachments: locksqlPNG.PNG, normaljobs.PNG > > > When a sql has only a small amount of concurrency (10), the sql will generate > 11 jobs, and can be executed smoothly (Please refer to attached picture). > But when I increased the number of concurrency to 20, each sql only executed > the first job and stopped (Please refer to attached picture), > And look at the driver thread dump and find that the subquery threads (20 > threads) is locked, detail below > > {code:java} > Monitor(org.apache.spark.sql.execution.aggregate.HashAggregateExec@1335537910}), > Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1502413281}), > Monitor(org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec@890810300}), > Monitor(java.lang.Object@603970601}), > Monitor(org.apache.spark.sql.execution.exchange.ShuffleExchangeExec@2042514973}) > {code} > {code:java} > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429) > java.util.concurrent.FutureTask.get(FutureTask.java:191) > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310) > org.apache.spark.sql.execution.SubqueryExec.executeCollect(basicPhysicalOperators.scala:861) > > org.apache.spark.sql.execution.ScalarSubquery.updateResult(subquery.scala:80) > org.apache.spark.sql.execution.SparkPlan.$anonfun$waitForSubqueries$1(SparkPlan.scala:262) > > org.apache.spark.sql.execution.SparkPlan.$anonfun$waitForSubqueries$1$adapted(SparkPlan.scala:261) > > org.apache.spark.sql.execution.SparkPlan$$Lambda$3650/586819338.apply(Unknown > Source) > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > org.apache.spark.sql.execution.SparkPlan.waitForSubqueries(SparkPlan.scala:261) > => holding > Monitor(org.apache.spark.sql.execution.aggregate.HashAggregateExec@1335537910}) > > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:231) > > org.apache.spark.sql.execution.SparkPlan$$Lambda$3645/1297667696.apply(Unknown > Source) > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) > org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) > > org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) > > org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:47) > > org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:660) > > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723) > > org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194) > > org.apache.spark.sql.execution.SparkPlan$$Lambda$3644/556844527.apply(Unknown > Source) > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) > > org.apache.spark.sql.execution.SparkPlan$$Lambda$3645/1297667696.apply(Unknown > Source) > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190) > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:135) > => holding > Monitor(org.apache.spark.sql.execution.exchange.ShuffleExchangeExec@2042514973}) > > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:135) > > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:140) > => holding > Monitor(org.apache.spark.sql.execution.exchange.ShuffleExchangeExec@2042514973}) > > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139) > > org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68) > > org.apache.spark.sql.execution.exchange.ShuffleExchangeLike$$Lambda$3671/828122256.apply(Unknown > Source) > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) > > org.apache.spark.sql.execution.SparkPlan$$Lambda$3645/1297667696.apply(Unknown > Source) > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) > org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68) > > org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67) > > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115) > > org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:174) > => holding > Monitor(org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec@890810300}) > > org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:174) > > org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:176) > > org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82) > > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:258) > > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:256) > > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$3669/118271447.apply(Unknown > Source) scala.collection.Iterator.foreach(Iterator.scala:943) > scala.collection.Iterator.foreach$(Iterator.scala:943) > scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > scala.collection.IterableLike.foreach(IterableLike.scala:74) > scala.collection.IterableLike.foreach$(IterableLike.scala:73) > scala.collection.AbstractIterable.foreach(Iterable.scala:56) > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:256) > > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$3616/1041219151.apply(Unknown > Source) org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:228) > => holding Monitor(java.lang.Object@603970601}) > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:367) > > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeTake(AdaptiveSparkPlanExec.scala:344) > > org.apache.spark.sql.execution.SubqueryExec.$anonfun$relationFuture$2(basicPhysicalOperators.scala:834) > > org.apache.spark.sql.execution.SubqueryExec$$Lambda$3652/1074832567.apply(Unknown > Source) > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:145) > > org.apache.spark.sql.execution.SQLExecution$$$Lambda$3653/1322734277.apply(Unknown > Source) > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169) > > org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:143) > > org.apache.spark.sql.execution.SubqueryExec.$anonfun$relationFuture$1(basicPhysicalOperators.scala:830) > > org.apache.spark.sql.execution.SubqueryExec$$Lambda$3648/502350376.apply(Unknown > Source) > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191) > > org.apache.spark.sql.execution.SQLExecution$$$Lambda$3649/2139778019.call(Unknown > Source) java.util.concurrent.FutureTask.run(FutureTask.java:266) > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > java.lang.Thread.run(Thread.java:750) > {code} > Not sure what's causing it, please let me know if you need any info, thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org