[ 
https://issues.apache.org/jira/browse/SPARK-54219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-54219:
----------------------------------
        Parent: SPARK-54137
    Issue Type: Sub-task  (was: Bug)

> Driver can't create thread causing ContextCleaner stuck and stuck stop process
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-54219
>                 URL: https://issues.apache.org/jira/browse/SPARK-54219
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Spark Core
>    Affects Versions: 3.2.1, 4.0.1
>            Reporter: angerszhu
>            Assignee: angerszhu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 4.2.0
>
>
> SparkContext stop stuck on ContextCleaner
> {code:java}
>           
> 25/11/05 18:12:29 ERROR [shutdown-hook-0] ThreadUtils: 
> 14 Driver BLOCKED Blocked by Thread 60 
> Lock(org.apache.spark.ContextCleaner@1726738661})
>   org.apache.spark.ContextCleaner.stop(ContextCleaner.scala:145)
>   org.apache.spark.SparkContext.$anonfun$stop$9(SparkContext.scala:2094)
>   
> org.apache.spark.SparkContext.$anonfun$stop$9$adapted(SparkContext.scala:2094)
>   org.apache.spark.SparkContext$$Lambda$5309/807013918.apply(Unknown Source)
>   scala.Option.foreach(Option.scala:407)
>   org.apache.spark.SparkContext.$anonfun$stop$8(SparkContext.scala:2094)
>   org.apache.spark.SparkContext$$Lambda$5308/1445921225.apply$mcV$sp(Unknown 
> Source)
>   org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1512)
>   org.apache.spark.SparkContext.stop(SparkContext.scala:2094)
>   org.apache.spark.SparkContext.stop(SparkContext.scala:2050)
>   org.apache.spark.sql.SparkSession.stop(SparkSession.scala:718)
>   com.shopee.data.content.ods.live_performance.Main$.main(Main.scala:62)
>   com.shopee.data.content.ods.live_performance.Main.main(Main.scala)
>   sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   java.lang.reflect.Method.invoke(Method.java:498)
>   
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:751)
>  {code}
> ContextCleaner stop() will wait lock
> {code:java}
> def stop(): Unit = {
>   stopped = true
>   // Interrupt the cleaning thread, but wait until the current task has 
> finished before
>   // doing so. This guards against the race condition where a cleaning thread 
> may
>   // potentially clean similarly named variables created by a different 
> SparkContext,
>   // resulting in otherwise inexplicable block-not-found exceptions 
> (SPARK-6132).
>   synchronized {
>     cleaningThread.interrupt()
>   }
>   cleaningThread.join()
>   periodicGCService.shutdown()
> }
>  {code}
> , but one call on keepCleaning() hold the lock
>  
> {code:java}
> 25/11/05 18:12:29 ERROR [shutdown-hook-0] ThreadUtils: 
> 60 Spark Context Cleaner TIMED_WAITING 
> Monitor(org.apache.spark.ContextCleaner@1726738661})
>   sun.misc.Unsafe.park(Native Method)
>   java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>   
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
>   
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
>   scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:248)
>   scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
>   scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
>   org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:294)
>   org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
>   
> org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:194)
>   
> org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:351)
>   
> org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
>   
> org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:78)
>   org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:254)
>   
> org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3(ContextCleaner.scala:204)
>   
> org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3$adapted(ContextCleaner.scala:195)
>   org.apache.spark.ContextCleaner$$Lambda$1178/1994584033.apply(Unknown 
> Source)
>   scala.Option.foreach(Option.scala:407)
>   
> org.apache.spark.ContextCleaner.$anonfun$keepCleaning$1(ContextCleaner.scala:195)
>  => holding Monitor(org.apache.spark.ContextCleaner@1726738661})
>   
> org.apache.spark.ContextCleaner$$Lambda$1109/1496842179.apply$mcV$sp(Unknown 
> Source)
>   org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1474)
>   
> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:189)
>   org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:79) {code}
> BlockManager stuck on removeBroadcast 
> {color:#ff0000}*RpcUtils.INFINITE_TIMEOUT.awaitResult(future) 【PR 
> [https://github.com/apache/spark/pull/28924] change here】*{color}
> {code:java}
> def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: 
> Boolean): Unit = {
>   val future = driverEndpoint.askSync[Future[Seq[Int]]](
>     RemoveBroadcast(broadcastId, removeFromMaster))
>   future.failed.foreach(e =>
>     logWarning(s"Failed to remove broadcast $broadcastId" +
>       s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
>   )(ThreadUtils.sameThread)
>   if (blocking) {
>     // the underlying Futures will timeout anyway, so it's safe to use 
> infinite timeout here
>     RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
>   }
> } {code}
> For such case only reason should be RPC was missing handling
> Driver OOM or A thread leak in yarn nm prevents the creation of new threads 
> to handle RPC.
> {code:java}
> 25/11/05 08:16:22 ERROR [metrics-paimon-push-gateway-reporter-2-thread-1] 
> ScheduledReporter: Exception thrown from PushGatewayReporter#report. 
> Exception was suppressed.
> java.lang.OutOfMemoryError: unable to create new native thread
>       at java.lang.Thread.start0(Native Method)
>       at java.lang.Thread.start(Thread.java:717)
>       at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1115)
>       at 
> sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1388)
>       at 
> sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1416)
>       at 
> sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1400)
>       at 
> sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
>       at 
> sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
>       at 
> sun.net.www.protocol.https.HttpsURLConnectionImpl.connect(HttpsURLConnectionImpl.java:167)
>       at 
> io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:243)
>       at io.prometheus.client.exporter.PushGateway.push(PushGateway.java:134)
>       at 
> org.apache.paimon.metrics.reporter.PushGatewayReporter.report(PushGatewayReporter.java:84)
>       at 
> com.codahale.metrics.ScheduledReporter.report(ScheduledReporter.java:253)
>       at 
> com.codahale.metrics.ScheduledReporter.lambda$start$0(ScheduledReporter.java:182)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748) {code}
>  
> then whole process stuck on here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to