[ 
https://issues.apache.org/jira/browse/SPARK-47759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bo Xiong updated SPARK-47759:
-----------------------------
    Description: 
h2. Symptom

It's observed that our Spark apps occasionally got stuck with an unexpected 
stack trace when reading/parsing a time string.

 

[Stack Trace 1] On emr-7.0.0 with Spark 3.5.0 runtime, the stack trace doesn't 
make sense since *120s* is a legitimate time string.
{code:java}
Caused by: java.lang.RuntimeException: java.lang.NumberFormatException: Time 
must be specified as seconds (s), milliseconds (ms), microseconds (us), minutes 
(m or min), hour (h), or day (d). E.g. 50s, 100ms, or 250us.
Failed to parse time string: 120s
    at org.apache.spark.network.util.JavaUtils.timeStringAs(JavaUtils.java:258)
    at 
org.apache.spark.network.util.JavaUtils.timeStringAsSec(JavaUtils.java:275)
    at org.apache.spark.util.Utils$.timeStringAsSeconds(Utils.scala:1166)
    at org.apache.spark.rpc.RpcTimeout$.apply(RpcTimeout.scala:131)
    at org.apache.spark.util.RpcUtils$.askRpcTimeout(RpcUtils.scala:41)
    at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:33)
    at 
org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533)
    at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640)
    at 
org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697)
    at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:682)
    at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
    at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
    at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
    at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
    at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at 
org.apache.spark.network.crypto.TransportCipher$DecryptionHandler.channelRead(TransportCipher.java:192)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
    at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
    at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:840) {code}
 

[Stack Trace 2] On emr-6.9.0 with Spark 3.3.0 runtime, the stack trace doesn't 
make sense since *3s* is a legitimate time string.
{code:java}
24/03/01 21:04:24 ERROR TransportRequestHandler: Error while invoking 
RpcHandler#receive() on RPC id 7358995660930182634
java.lang.NumberFormatException: Time must be specified as seconds (s), 
milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). 
E.g. 50s, 100ms, or 250us.
Failed to parse time string: 3s
    at org.apache.spark.network.util.JavaUtils.timeStringAs(JavaUtils.java:252) 
~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.internal.config.ConfigHelpers$.timeFromString(ConfigBuilder.scala:56)
 ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.internal.config.ConfigBuilder.$anonfun$timeConf$1(ConfigBuilder.scala:256)
 ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.internal.config.ConfigBuilder.$anonfun$timeConf$1$adapted(ConfigBuilder.scala:256)
 ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.internal.config.ConfigEntryWithDefaultString.readFrom(ConfigEntry.scala:206)
 ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at org.apache.spark.SparkConf.get(SparkConf.scala:261) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at org.apache.spark.util.RpcUtils$.retryWaitMs(RpcUtils.scala:46) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:34) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697)
 ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:682) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
 ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]

    ...

    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) 
~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
 ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
~[netty-common-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402] {code}
 

*[Stack Trace 3]* On emr-7.0.0 with Spark 3.5.0 runtime, the stack trace is 
slightly different but also points to when val
{noformat}
timeout = { Utils.timeStringAsSeconds(finalProp._2).seconds }{noformat}
 

 
{code:java}
24/04/02 14:20:14 ERROR TransportRequestHandler: Error while invoking 
RpcHandler#receive() for one-way message.
java.lang.NullPointerException: Cannot invoke 
"scala.concurrent.duration.Duration$.apply(long, 
java.util.concurrent.TimeUnit)" because 
"scala.concurrent.duration.Duration$.MODULE$" is null
    at 
scala.concurrent.duration.package$DurationLong$.durationIn$extension(package.scala:60)
 ~[scala-library-2.12.17.jar:?]
    at 
scala.concurrent.duration.package$DurationLong.durationIn(package.scala:60) 
~[scala-library-2.12.17.jar:?]
    at 
scala.concurrent.duration.DurationConversions.seconds(DurationConversions.scala:37)
 ~[scala-library-2.12.17.jar:?]
    at 
scala.concurrent.duration.DurationConversions.seconds$(DurationConversions.scala:37)
 ~[scala-library-2.12.17.jar:?]
    at scala.concurrent.duration.package$DurationLong.seconds(package.scala:59) 
~[scala-library-2.12.17.jar:?]
    at org.apache.spark.rpc.RpcTimeout$.apply(RpcTimeout.scala:131) 
~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
    at org.apache.spark.util.RpcUtils$.askRpcTimeout(RpcUtils.scala:41) 
~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
    at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:33) 
~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
    at 
org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533) 
~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
    at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640) 
~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
    at 
org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697)
 ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
    at 
org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:689) 
~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
    at 
org.apache.spark.network.server.AbstractAuthRpcHandler.receive(AbstractAuthRpcHandler.java:66)
 ~[spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
    at 
org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:274)
 [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
    at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111)
 [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
    at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
 [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
    at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
 [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
    at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
 [netty-handler-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 [netty-codec-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
 [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
org.apache.spark.network.crypto.TransportCipher$DecryptionHandler.channelRead(TransportCipher.java:192)
 [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) 
[netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
 [netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) 
[netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) 
[netty-transport-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
 [netty-common-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
[netty-common-4.1.96.Final.jar:4.1.96.Final]
    at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [netty-common-4.1.96.Final.jar:4.1.96.Final]
    at java.lang.Thread.run(Thread.java:840) [?:?] {code}
 

 

 

 

 
h2. Relevant code paths

Within an executor process, there's a [dispatcher 
thread|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170]
 dedicated to CoarseGrainedExecutorBackend (a single RPC endpoint) that 
launches tasks scheduled by the driver. Each task is run on a TaskRunner thread 
backed by a thread pool created for the executor. The TaskRunner thread and the 
dispatcher thread are different. However, they read and write a common object 
(i.e., taskResources) that's a mutable hashmap without thread-safety, in 
[Executor|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561]
 and 
[CoarseGrainedExecutorBackend|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189],
 respectively.
h2. What's going on?

Based on the above observations, our hypothesis is that the dispatcher thread 
runs into an "infinite loop" due to a race condition when two threads access 
the same hashmap object.  For illustration purpose, let's consider the 
following scenario where two threads (Thread 1 and Thread 2) access a hash 
table without thread-safety
 * Thread 1 sees A.next = B, but then yields execution to Thread 2

!hashtable1.png|width=357,height=182!
 
 * Thread 2 triggers a resize operation resulting in B.next = A (Note that 
hashmap doesn't care about ordering), and then yields execution to Thread 1.

 

!hashtable2.png|width=383,height=329!

 
 * After taking over CPU, Thread 1 would run into an "infinite loop" when 
traversing the list in the last bucket, given A.next = B and B.next = A in its 
view.

h2. Proposed fix
 * Replace {{scala.collection.mutable.HashMap}} in CoarseGrainedExecutorBackend 
with {{{}java.util.concurrent.{}}}{{{}ConcurrentHashMap{}}} for thread safety.
 * As a mitigation before the fix is released, consider to use fewer threads 
per executor process (i.e., fewer spark.executor.cores)to reduce the likelihood 
of such a race condition.

  was:
h2. Symptom

It's observed that our Spark apps occasionally got stuck with a stack trace 
indicating that a legitimate time string cannot be parsed.

 

[Stack Trace 1] On emr-7.0.0 with Spark 3.5.0 runtime, the stack trace doesn't 
make sense since *120s* is a legitimate time string.
{code:java}
Caused by: java.lang.RuntimeException: java.lang.NumberFormatException: Time 
must be specified as seconds (s), milliseconds (ms), microseconds (us), minutes 
(m or min), hour (h), or day (d). E.g. 50s, 100ms, or 250us.
Failed to parse time string: 120s
    at org.apache.spark.network.util.JavaUtils.timeStringAs(JavaUtils.java:258)
    at 
org.apache.spark.network.util.JavaUtils.timeStringAsSec(JavaUtils.java:275)
    at org.apache.spark.util.Utils$.timeStringAsSeconds(Utils.scala:1166)
    at org.apache.spark.rpc.RpcTimeout$.apply(RpcTimeout.scala:131)
    at org.apache.spark.util.RpcUtils$.askRpcTimeout(RpcUtils.scala:41)
    at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:33)
    at 
org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533)
    at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640)
    at 
org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697)
    at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:682)
    at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
    at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
    at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
    at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
    at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at 
org.apache.spark.network.crypto.TransportCipher$DecryptionHandler.channelRead(TransportCipher.java:192)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
    at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
    at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:840) {code}
 

[Stack Trace 2] On emr-6.9.0 with Spark 3.3.0 runtime, the stack trace doesn't 
make sense since *3s* is a legitimate time string.
{code:java}
24/03/01 20:35:16 ERROR TransportRequestHandler: Error while invoking 
RpcHandler#receive() for one-way message.
java.lang.NumberFormatException: Time must be specified as seconds (s), 
milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). 
E.g. 50s, 100ms, or 250us.
Failed to parse time string: 3s
    at org.apache.spark.network.util.JavaUtils.timeStringAs(JavaUtils.java:252) 
~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.internal.config.ConfigHelpers$.timeFromString(ConfigBuilder.scala:56)
 ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.internal.config.ConfigBuilder.$anonfun$timeConf$1(ConfigBuilder.scala:256)
 ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.internal.config.ConfigBuilder.$anonfun$timeConf$1$adapted(ConfigBuilder.scala:256)
 ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.internal.config.ConfigEntryWithDefaultString.readFrom(ConfigEntry.scala:206)
 ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at org.apache.spark.SparkConf.get(SparkConf.scala:261) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at org.apache.spark.util.RpcUtils$.retryWaitMs(RpcUtils.scala:46) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:34) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697)
 ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:689) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:274)
 ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111)
 ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
 ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
 ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
 ~[netty-handler-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 ~[netty-codec-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
 ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
org.apache.spark.network.crypto.TransportCipher$DecryptionHandler.channelRead(TransportCipher.java:190)
 ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) 
~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
 ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) 
~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) 
~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
 ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
~[netty-common-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402]
24/03/01 21:04:24 ERROR TransportRequestHandler: Error while invoking 
RpcHandler#receive() on RPC id 7358995660930182634
java.lang.NumberFormatException: Time must be specified as seconds (s), 
milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). 
E.g. 50s, 100ms, or 250us.
Failed to parse time string: 3s
    at org.apache.spark.network.util.JavaUtils.timeStringAs(JavaUtils.java:252) 
~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.internal.config.ConfigHelpers$.timeFromString(ConfigBuilder.scala:56)
 ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.internal.config.ConfigBuilder.$anonfun$timeConf$1(ConfigBuilder.scala:256)
 ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.internal.config.ConfigBuilder.$anonfun$timeConf$1$adapted(ConfigBuilder.scala:256)
 ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.internal.config.ConfigEntryWithDefaultString.readFrom(ConfigEntry.scala:206)
 ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at org.apache.spark.SparkConf.get(SparkConf.scala:261) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at org.apache.spark.util.RpcUtils$.retryWaitMs(RpcUtils.scala:46) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:34) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697)
 ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:682) 
~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
    at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
 ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]

    ...

    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) 
~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
 ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
~[netty-common-4.1.74.Final.jar:4.1.74.Final]
    at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402] {code}
[Stack Trace 2] On emr-7.0.0 with Spark 3.5.0 runtime, the stack trace doesn't 
make sense since it says that a legitimate time string cannot be parsed.
h2. Relevant code paths

Within an executor process, there's a [dispatcher 
thread|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170]
 dedicated to CoarseGrainedExecutorBackend (a single RPC endpoint) that 
launches tasks scheduled by the driver. Each task is run on a TaskRunner thread 
backed by a thread pool created for the executor. The TaskRunner thread and the 
dispatcher thread are different. However, they read and write a common object 
(i.e., taskResources) that's a mutable hashmap without thread-safety, in 
[Executor|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561]
 and 
[CoarseGrainedExecutorBackend|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189],
 respectively.
h2. What's going on?

Based on the above observations, our hypothesis is that the dispatcher thread 
runs into an "infinite loop" due to a race condition when two threads access 
the same hashmap object.  For illustration purpose, let's consider the 
following scenario where two threads (Thread 1 and Thread 2) access a hash 
table without thread-safety
 * Thread 1 sees A.next = B, but then yields execution to Thread 2

!hashtable1.png|width=357,height=182!
 
 * Thread 2 triggers a resize operation resulting in B.next = A (Note that 
hashmap doesn't care about ordering), and then yields execution to Thread 1.

 

!hashtable2.png|width=383,height=329!

 
 * After taking over CPU, Thread 1 would run into an "infinite loop" when 
traversing the list in the last bucket, given A.next = B and B.next = A in its 
view.

h2. Proposed fix
 * Replace {{scala.collection.mutable.HashMap}} in CoarseGrainedExecutorBackend 
with {{{}java.util.concurrent.{}}}{{{}ConcurrentHashMap{}}} for thread safety.
 * As a mitigation before the fix is released, consider to use fewer threads 
per executor process (i.e., fewer spark.executor.cores)to reduce the likelihood 
of such a race condition.


> A Spark app being stuck with an unexpected stack trace when reading/parsing a 
> time string
> -----------------------------------------------------------------------------------------
>
>                 Key: SPARK-47759
>                 URL: https://issues.apache.org/jira/browse/SPARK-47759
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.5.0, 4.0.0
>            Reporter: Bo Xiong
>            Assignee: Bo Xiong
>            Priority: Critical
>              Labels: hang, pull-request-available, stuck, threadsafe
>             Fix For: 3.5.0, 4.0.0
>
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> h2. Symptom
> It's observed that our Spark apps occasionally got stuck with an unexpected 
> stack trace when reading/parsing a time string.
>  
> [Stack Trace 1] On emr-7.0.0 with Spark 3.5.0 runtime, the stack trace 
> doesn't make sense since *120s* is a legitimate time string.
> {code:java}
> Caused by: java.lang.RuntimeException: java.lang.NumberFormatException: Time 
> must be specified as seconds (s), milliseconds (ms), microseconds (us), 
> minutes (m or min), hour (h), or day (d). E.g. 50s, 100ms, or 250us.
> Failed to parse time string: 120s
>     at 
> org.apache.spark.network.util.JavaUtils.timeStringAs(JavaUtils.java:258)
>     at 
> org.apache.spark.network.util.JavaUtils.timeStringAsSec(JavaUtils.java:275)
>     at org.apache.spark.util.Utils$.timeStringAsSeconds(Utils.scala:1166)
>     at org.apache.spark.rpc.RpcTimeout$.apply(RpcTimeout.scala:131)
>     at org.apache.spark.util.RpcUtils$.askRpcTimeout(RpcUtils.scala:41)
>     at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:33)
>     at 
> org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533)
>     at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640)
>     at 
> org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697)
>     at 
> org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:682)
>     at 
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
>     at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
>     at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
>     at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
>     at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
>     at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
>     at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
>     at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
>     at 
> org.apache.spark.network.crypto.TransportCipher$DecryptionHandler.channelRead(TransportCipher.java:192)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
>     at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>     at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>     at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
>     at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
>     at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
>     at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
>     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
>     at 
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
>     at 
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>     at 
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>     at java.base/java.lang.Thread.run(Thread.java:840) {code}
>  
> [Stack Trace 2] On emr-6.9.0 with Spark 3.3.0 runtime, the stack trace 
> doesn't make sense since *3s* is a legitimate time string.
> {code:java}
> 24/03/01 21:04:24 ERROR TransportRequestHandler: Error while invoking 
> RpcHandler#receive() on RPC id 7358995660930182634
> java.lang.NumberFormatException: Time must be specified as seconds (s), 
> milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day 
> (d). E.g. 50s, 100ms, or 250us.
> Failed to parse time string: 3s
>     at 
> org.apache.spark.network.util.JavaUtils.timeStringAs(JavaUtils.java:252) 
> ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
>     at 
> org.apache.spark.internal.config.ConfigHelpers$.timeFromString(ConfigBuilder.scala:56)
>  ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
>     at 
> org.apache.spark.internal.config.ConfigBuilder.$anonfun$timeConf$1(ConfigBuilder.scala:256)
>  ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
>     at 
> org.apache.spark.internal.config.ConfigBuilder.$anonfun$timeConf$1$adapted(ConfigBuilder.scala:256)
>  ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
>     at 
> org.apache.spark.internal.config.ConfigEntryWithDefaultString.readFrom(ConfigEntry.scala:206)
>  ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
>     at org.apache.spark.SparkConf.get(SparkConf.scala:261) 
> ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
>     at org.apache.spark.util.RpcUtils$.retryWaitMs(RpcUtils.scala:46) 
> ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
>     at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:34) 
> ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
>     at 
> org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533) 
> ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
>     at 
> org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640) 
> ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
>     at 
> org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697)
>  ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
>     at 
> org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:682) 
> ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
>     at 
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
>  ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
>     ...
>     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) 
> ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
>     at 
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
>  ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
>     at 
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
> ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
>     at 
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>  ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
>     at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402] {code}
>  
> *[Stack Trace 3]* On emr-7.0.0 with Spark 3.5.0 runtime, the stack trace is 
> slightly different but also points to when val
> {noformat}
> timeout = { Utils.timeStringAsSeconds(finalProp._2).seconds }{noformat}
>  
>  
> {code:java}
> 24/04/02 14:20:14 ERROR TransportRequestHandler: Error while invoking 
> RpcHandler#receive() for one-way message.
> java.lang.NullPointerException: Cannot invoke 
> "scala.concurrent.duration.Duration$.apply(long, 
> java.util.concurrent.TimeUnit)" because 
> "scala.concurrent.duration.Duration$.MODULE$" is null
>     at 
> scala.concurrent.duration.package$DurationLong$.durationIn$extension(package.scala:60)
>  ~[scala-library-2.12.17.jar:?]
>     at 
> scala.concurrent.duration.package$DurationLong.durationIn(package.scala:60) 
> ~[scala-library-2.12.17.jar:?]
>     at 
> scala.concurrent.duration.DurationConversions.seconds(DurationConversions.scala:37)
>  ~[scala-library-2.12.17.jar:?]
>     at 
> scala.concurrent.duration.DurationConversions.seconds$(DurationConversions.scala:37)
>  ~[scala-library-2.12.17.jar:?]
>     at 
> scala.concurrent.duration.package$DurationLong.seconds(package.scala:59) 
> ~[scala-library-2.12.17.jar:?]
>     at org.apache.spark.rpc.RpcTimeout$.apply(RpcTimeout.scala:131) 
> ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
>     at org.apache.spark.util.RpcUtils$.askRpcTimeout(RpcUtils.scala:41) 
> ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
>     at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:33) 
> ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
>     at 
> org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533) 
> ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
>     at 
> org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640) 
> ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
>     at 
> org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697)
>  ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
>     at 
> org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:689) 
> ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
>     at 
> org.apache.spark.network.server.AbstractAuthRpcHandler.receive(AbstractAuthRpcHandler.java:66)
>  ~[spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
>     at 
> org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:274)
>  [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
>     at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111)
>  [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
>     at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
>  [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
>     at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
>  [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
>     at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
>  [netty-handler-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>  [netty-codec-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
>  [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> org.apache.spark.network.crypto.TransportCipher$DecryptionHandler.channelRead(TransportCipher.java:192)
>  [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0]
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) 
> [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
>  [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) 
> [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) 
> [netty-transport-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
>  [netty-common-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
> [netty-common-4.1.96.Final.jar:4.1.96.Final]
>     at 
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>  [netty-common-4.1.96.Final.jar:4.1.96.Final]
>     at java.lang.Thread.run(Thread.java:840) [?:?] {code}
>  
>  
>  
>  
>  
> h2. Relevant code paths
> Within an executor process, there's a [dispatcher 
> thread|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170]
>  dedicated to CoarseGrainedExecutorBackend (a single RPC endpoint) that 
> launches tasks scheduled by the driver. Each task is run on a TaskRunner 
> thread backed by a thread pool created for the executor. The TaskRunner 
> thread and the dispatcher thread are different. However, they read and write 
> a common object (i.e., taskResources) that's a mutable hashmap without 
> thread-safety, in 
> [Executor|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561]
>  and 
> [CoarseGrainedExecutorBackend|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189],
>  respectively.
> h2. What's going on?
> Based on the above observations, our hypothesis is that the dispatcher thread 
> runs into an "infinite loop" due to a race condition when two threads access 
> the same hashmap object.  For illustration purpose, let's consider the 
> following scenario where two threads (Thread 1 and Thread 2) access a hash 
> table without thread-safety
>  * Thread 1 sees A.next = B, but then yields execution to Thread 2
> !hashtable1.png|width=357,height=182!
>  
>  * Thread 2 triggers a resize operation resulting in B.next = A (Note that 
> hashmap doesn't care about ordering), and then yields execution to Thread 1.
>  
> !hashtable2.png|width=383,height=329!
>  
>  * After taking over CPU, Thread 1 would run into an "infinite loop" when 
> traversing the list in the last bucket, given A.next = B and B.next = A in 
> its view.
> h2. Proposed fix
>  * Replace {{scala.collection.mutable.HashMap}} in 
> CoarseGrainedExecutorBackend with 
> {{{}java.util.concurrent.{}}}{{{}ConcurrentHashMap{}}} for thread safety.
>  * As a mitigation before the fix is released, consider to use fewer threads 
> per executor process (i.e., fewer spark.executor.cores)to reduce the 
> likelihood of such a race condition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to