[ https://issues.apache.org/jira/browse/SPARK-47759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bo Xiong updated SPARK-47759: ----------------------------- Description: h2. Symptom It's observed that our Spark apps occasionally got stuck with an unexpected stack trace when reading/parsing a time string. [Stack Trace 1] On emr-7.0.0 with Spark 3.5.0 runtime, the stack trace doesn't make sense since *120s* is a legitimate time string. {code:java} Caused by: java.lang.RuntimeException: java.lang.NumberFormatException: Time must be specified as seconds (s), milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). E.g. 50s, 100ms, or 250us. Failed to parse time string: 120s at org.apache.spark.network.util.JavaUtils.timeStringAs(JavaUtils.java:258) at org.apache.spark.network.util.JavaUtils.timeStringAsSec(JavaUtils.java:275) at org.apache.spark.util.Utils$.timeStringAsSeconds(Utils.scala:1166) at org.apache.spark.rpc.RpcTimeout$.apply(RpcTimeout.scala:131) at org.apache.spark.util.RpcUtils$.askRpcTimeout(RpcUtils.scala:41) at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:33) at org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533) at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640) at org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697) at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:682) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.spark.network.crypto.TransportCipher$DecryptionHandler.channelRead(TransportCipher.java:192) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:840) {code} [Stack Trace 2] On emr-6.9.0 with Spark 3.3.0 runtime, the stack trace doesn't make sense since *3s* is a legitimate time string. {code:java} 24/03/01 21:04:24 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 7358995660930182634 java.lang.NumberFormatException: Time must be specified as seconds (s), milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). E.g. 50s, 100ms, or 250us. Failed to parse time string: 3s at org.apache.spark.network.util.JavaUtils.timeStringAs(JavaUtils.java:252) ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.internal.config.ConfigHelpers$.timeFromString(ConfigBuilder.scala:56) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.internal.config.ConfigBuilder.$anonfun$timeConf$1(ConfigBuilder.scala:256) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.internal.config.ConfigBuilder.$anonfun$timeConf$1$adapted(ConfigBuilder.scala:256) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.internal.config.ConfigEntryWithDefaultString.readFrom(ConfigEntry.scala:206) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.SparkConf.get(SparkConf.scala:261) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.util.RpcUtils$.retryWaitMs(RpcUtils.scala:46) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:34) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:682) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163) ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] ... at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.74.Final.jar:4.1.74.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.74.Final.jar:4.1.74.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.74.Final.jar:4.1.74.Final] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402] {code} *[Stack Trace 3]* On emr-7.0.0 with Spark 3.5.0 runtime, the stack trace is slightly different but also points to when val {noformat} timeout = { Utils.timeStringAsSeconds(finalProp._2).seconds }{noformat} {code:java} 24/04/02 14:20:14 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message. java.lang.NullPointerException: Cannot invoke "scala.concurrent.duration.Duration$.apply(long, java.util.concurrent.TimeUnit)" because "scala.concurrent.duration.Duration$.MODULE$" is null at scala.concurrent.duration.package$DurationLong$.durationIn$extension(package.scala:60) ~[scala-library-2.12.17.jar:?] at scala.concurrent.duration.package$DurationLong.durationIn(package.scala:60) ~[scala-library-2.12.17.jar:?] at scala.concurrent.duration.DurationConversions.seconds(DurationConversions.scala:37) ~[scala-library-2.12.17.jar:?] at scala.concurrent.duration.DurationConversions.seconds$(DurationConversions.scala:37) ~[scala-library-2.12.17.jar:?] at scala.concurrent.duration.package$DurationLong.seconds(package.scala:59) ~[scala-library-2.12.17.jar:?] at org.apache.spark.rpc.RpcTimeout$.apply(RpcTimeout.scala:131) ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] at org.apache.spark.util.RpcUtils$.askRpcTimeout(RpcUtils.scala:41) ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:33) ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] at org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533) ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640) ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] at org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697) ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:689) ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] at org.apache.spark.network.server.AbstractAuthRpcHandler.receive(AbstractAuthRpcHandler.java:66) ~[spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:274) [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111) [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140) [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [netty-handler-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) [netty-codec-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at org.apache.spark.network.crypto.TransportCipher$DecryptionHandler.channelRead(TransportCipher.java:192) [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) [netty-transport-4.1.96.Final.jar:4.1.96.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [netty-common-4.1.96.Final.jar:4.1.96.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.96.Final.jar:4.1.96.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.96.Final.jar:4.1.96.Final] at java.lang.Thread.run(Thread.java:840) [?:?] {code} h2. Relevant code paths Within an executor process, there's a [dispatcher thread|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170] dedicated to CoarseGrainedExecutorBackend (a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in [Executor|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561] and [CoarseGrainedExecutorBackend|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189], respectively. h2. What's going on? Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object. For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety * Thread 1 sees A.next = B, but then yields execution to Thread 2 !hashtable1.png|width=357,height=182! * Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1. !hashtable2.png|width=383,height=329! * After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view. h2. Proposed fix * Replace {{scala.collection.mutable.HashMap}} in CoarseGrainedExecutorBackend with {{{}java.util.concurrent.{}}}{{{}ConcurrentHashMap{}}} for thread safety. * As a mitigation before the fix is released, consider to use fewer threads per executor process (i.e., fewer spark.executor.cores)to reduce the likelihood of such a race condition. was: h2. Symptom It's observed that our Spark apps occasionally got stuck with a stack trace indicating that a legitimate time string cannot be parsed. [Stack Trace 1] On emr-7.0.0 with Spark 3.5.0 runtime, the stack trace doesn't make sense since *120s* is a legitimate time string. {code:java} Caused by: java.lang.RuntimeException: java.lang.NumberFormatException: Time must be specified as seconds (s), milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). E.g. 50s, 100ms, or 250us. Failed to parse time string: 120s at org.apache.spark.network.util.JavaUtils.timeStringAs(JavaUtils.java:258) at org.apache.spark.network.util.JavaUtils.timeStringAsSec(JavaUtils.java:275) at org.apache.spark.util.Utils$.timeStringAsSeconds(Utils.scala:1166) at org.apache.spark.rpc.RpcTimeout$.apply(RpcTimeout.scala:131) at org.apache.spark.util.RpcUtils$.askRpcTimeout(RpcUtils.scala:41) at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:33) at org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533) at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640) at org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697) at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:682) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.spark.network.crypto.TransportCipher$DecryptionHandler.channelRead(TransportCipher.java:192) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:840) {code} [Stack Trace 2] On emr-6.9.0 with Spark 3.3.0 runtime, the stack trace doesn't make sense since *3s* is a legitimate time string. {code:java} 24/03/01 20:35:16 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message. java.lang.NumberFormatException: Time must be specified as seconds (s), milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). E.g. 50s, 100ms, or 250us. Failed to parse time string: 3s at org.apache.spark.network.util.JavaUtils.timeStringAs(JavaUtils.java:252) ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.internal.config.ConfigHelpers$.timeFromString(ConfigBuilder.scala:56) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.internal.config.ConfigBuilder.$anonfun$timeConf$1(ConfigBuilder.scala:256) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.internal.config.ConfigBuilder.$anonfun$timeConf$1$adapted(ConfigBuilder.scala:256) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.internal.config.ConfigEntryWithDefaultString.readFrom(ConfigEntry.scala:206) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.SparkConf.get(SparkConf.scala:261) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.util.RpcUtils$.retryWaitMs(RpcUtils.scala:46) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:34) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:689) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:274) ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111) ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140) ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[netty-handler-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[netty-codec-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at org.apache.spark.network.crypto.TransportCipher$DecryptionHandler.channelRead(TransportCipher.java:190) ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.74.Final.jar:4.1.74.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.74.Final.jar:4.1.74.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.74.Final.jar:4.1.74.Final] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402] 24/03/01 21:04:24 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 7358995660930182634 java.lang.NumberFormatException: Time must be specified as seconds (s), milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). E.g. 50s, 100ms, or 250us. Failed to parse time string: 3s at org.apache.spark.network.util.JavaUtils.timeStringAs(JavaUtils.java:252) ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.internal.config.ConfigHelpers$.timeFromString(ConfigBuilder.scala:56) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.internal.config.ConfigBuilder.$anonfun$timeConf$1(ConfigBuilder.scala:256) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.internal.config.ConfigBuilder.$anonfun$timeConf$1$adapted(ConfigBuilder.scala:256) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.internal.config.ConfigEntryWithDefaultString.readFrom(ConfigEntry.scala:206) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.SparkConf.get(SparkConf.scala:261) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.util.RpcUtils$.retryWaitMs(RpcUtils.scala:46) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:34) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:682) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163) ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] ... at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.74.Final.jar:4.1.74.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.74.Final.jar:4.1.74.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.74.Final.jar:4.1.74.Final] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402] {code} [Stack Trace 2] On emr-7.0.0 with Spark 3.5.0 runtime, the stack trace doesn't make sense since it says that a legitimate time string cannot be parsed. h2. Relevant code paths Within an executor process, there's a [dispatcher thread|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170] dedicated to CoarseGrainedExecutorBackend (a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in [Executor|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561] and [CoarseGrainedExecutorBackend|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189], respectively. h2. What's going on? Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object. For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety * Thread 1 sees A.next = B, but then yields execution to Thread 2 !hashtable1.png|width=357,height=182! * Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1. !hashtable2.png|width=383,height=329! * After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view. h2. Proposed fix * Replace {{scala.collection.mutable.HashMap}} in CoarseGrainedExecutorBackend with {{{}java.util.concurrent.{}}}{{{}ConcurrentHashMap{}}} for thread safety. * As a mitigation before the fix is released, consider to use fewer threads per executor process (i.e., fewer spark.executor.cores)to reduce the likelihood of such a race condition. > A Spark app being stuck with an unexpected stack trace when reading/parsing a > time string > ----------------------------------------------------------------------------------------- > > Key: SPARK-47759 > URL: https://issues.apache.org/jira/browse/SPARK-47759 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.5.0, 4.0.0 > Reporter: Bo Xiong > Assignee: Bo Xiong > Priority: Critical > Labels: hang, pull-request-available, stuck, threadsafe > Fix For: 3.5.0, 4.0.0 > > Original Estimate: 4h > Remaining Estimate: 4h > > h2. Symptom > It's observed that our Spark apps occasionally got stuck with an unexpected > stack trace when reading/parsing a time string. > > [Stack Trace 1] On emr-7.0.0 with Spark 3.5.0 runtime, the stack trace > doesn't make sense since *120s* is a legitimate time string. > {code:java} > Caused by: java.lang.RuntimeException: java.lang.NumberFormatException: Time > must be specified as seconds (s), milliseconds (ms), microseconds (us), > minutes (m or min), hour (h), or day (d). E.g. 50s, 100ms, or 250us. > Failed to parse time string: 120s > at > org.apache.spark.network.util.JavaUtils.timeStringAs(JavaUtils.java:258) > at > org.apache.spark.network.util.JavaUtils.timeStringAsSec(JavaUtils.java:275) > at org.apache.spark.util.Utils$.timeStringAsSeconds(Utils.scala:1166) > at org.apache.spark.rpc.RpcTimeout$.apply(RpcTimeout.scala:131) > at org.apache.spark.util.RpcUtils$.askRpcTimeout(RpcUtils.scala:41) > at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:33) > at > org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533) > at org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640) > at > org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697) > at > org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:682) > at > org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163) > at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) > at > org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) > at > org.apache.spark.network.crypto.TransportCipher$DecryptionHandler.channelRead(TransportCipher.java:192) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) > at > io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) > at > io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at > io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) > at java.base/java.lang.Thread.run(Thread.java:840) {code} > > [Stack Trace 2] On emr-6.9.0 with Spark 3.3.0 runtime, the stack trace > doesn't make sense since *3s* is a legitimate time string. > {code:java} > 24/03/01 21:04:24 ERROR TransportRequestHandler: Error while invoking > RpcHandler#receive() on RPC id 7358995660930182634 > java.lang.NumberFormatException: Time must be specified as seconds (s), > milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day > (d). E.g. 50s, 100ms, or 250us. > Failed to parse time string: 3s > at > org.apache.spark.network.util.JavaUtils.timeStringAs(JavaUtils.java:252) > ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] > at > org.apache.spark.internal.config.ConfigHelpers$.timeFromString(ConfigBuilder.scala:56) > ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] > at > org.apache.spark.internal.config.ConfigBuilder.$anonfun$timeConf$1(ConfigBuilder.scala:256) > ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] > at > org.apache.spark.internal.config.ConfigBuilder.$anonfun$timeConf$1$adapted(ConfigBuilder.scala:256) > ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] > at > org.apache.spark.internal.config.ConfigEntryWithDefaultString.readFrom(ConfigEntry.scala:206) > ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] > at org.apache.spark.SparkConf.get(SparkConf.scala:261) > ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] > at org.apache.spark.util.RpcUtils$.retryWaitMs(RpcUtils.scala:46) > ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] > at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:34) > ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] > at > org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533) > ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] > at > org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640) > ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] > at > org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697) > ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] > at > org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:682) > ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] > at > org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163) > ~[spark-network-common_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1] > ... > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) > ~[netty-transport-4.1.74.Final.jar:4.1.74.Final] > at > io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) > ~[netty-common-4.1.74.Final.jar:4.1.74.Final] > at > io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > ~[netty-common-4.1.74.Final.jar:4.1.74.Final] > at > io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) > ~[netty-common-4.1.74.Final.jar:4.1.74.Final] > at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_402] {code} > > *[Stack Trace 3]* On emr-7.0.0 with Spark 3.5.0 runtime, the stack trace is > slightly different but also points to when val > {noformat} > timeout = { Utils.timeStringAsSeconds(finalProp._2).seconds }{noformat} > > > {code:java} > 24/04/02 14:20:14 ERROR TransportRequestHandler: Error while invoking > RpcHandler#receive() for one-way message. > java.lang.NullPointerException: Cannot invoke > "scala.concurrent.duration.Duration$.apply(long, > java.util.concurrent.TimeUnit)" because > "scala.concurrent.duration.Duration$.MODULE$" is null > at > scala.concurrent.duration.package$DurationLong$.durationIn$extension(package.scala:60) > ~[scala-library-2.12.17.jar:?] > at > scala.concurrent.duration.package$DurationLong.durationIn(package.scala:60) > ~[scala-library-2.12.17.jar:?] > at > scala.concurrent.duration.DurationConversions.seconds(DurationConversions.scala:37) > ~[scala-library-2.12.17.jar:?] > at > scala.concurrent.duration.DurationConversions.seconds$(DurationConversions.scala:37) > ~[scala-library-2.12.17.jar:?] > at > scala.concurrent.duration.package$DurationLong.seconds(package.scala:59) > ~[scala-library-2.12.17.jar:?] > at org.apache.spark.rpc.RpcTimeout$.apply(RpcTimeout.scala:131) > ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] > at org.apache.spark.util.RpcUtils$.askRpcTimeout(RpcUtils.scala:41) > ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] > at org.apache.spark.rpc.RpcEndpointRef.<init>(RpcEndpointRef.scala:33) > ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] > at > org.apache.spark.rpc.netty.NettyRpcEndpointRef.<init>(NettyRpcEnv.scala:533) > ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] > at > org.apache.spark.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:640) > ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] > at > org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:697) > ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] > at > org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:689) > ~[spark-core_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] > at > org.apache.spark.network.server.AbstractAuthRpcHandler.receive(AbstractAuthRpcHandler.java:66) > ~[spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] > at > org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:274) > [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] > at > org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111) > [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140) > [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) > [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) > [netty-handler-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > [netty-codec-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) > [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > org.apache.spark.network.crypto.TransportCipher$DecryptionHandler.channelRead(TransportCipher.java:192) > [spark-network-common_2.12-3.5.0-amzn-0.jar:3.5.0-amzn-0] > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) > [netty-transport-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) > [netty-common-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > [netty-common-4.1.96.Final.jar:4.1.96.Final] > at > io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) > [netty-common-4.1.96.Final.jar:4.1.96.Final] > at java.lang.Thread.run(Thread.java:840) [?:?] {code} > > > > > > h2. Relevant code paths > Within an executor process, there's a [dispatcher > thread|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170] > dedicated to CoarseGrainedExecutorBackend (a single RPC endpoint) that > launches tasks scheduled by the driver. Each task is run on a TaskRunner > thread backed by a thread pool created for the executor. The TaskRunner > thread and the dispatcher thread are different. However, they read and write > a common object (i.e., taskResources) that's a mutable hashmap without > thread-safety, in > [Executor|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561] > and > [CoarseGrainedExecutorBackend|https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189], > respectively. > h2. What's going on? > Based on the above observations, our hypothesis is that the dispatcher thread > runs into an "infinite loop" due to a race condition when two threads access > the same hashmap object. For illustration purpose, let's consider the > following scenario where two threads (Thread 1 and Thread 2) access a hash > table without thread-safety > * Thread 1 sees A.next = B, but then yields execution to Thread 2 > !hashtable1.png|width=357,height=182! > > * Thread 2 triggers a resize operation resulting in B.next = A (Note that > hashmap doesn't care about ordering), and then yields execution to Thread 1. > > !hashtable2.png|width=383,height=329! > > * After taking over CPU, Thread 1 would run into an "infinite loop" when > traversing the list in the last bucket, given A.next = B and B.next = A in > its view. > h2. Proposed fix > * Replace {{scala.collection.mutable.HashMap}} in > CoarseGrainedExecutorBackend with > {{{}java.util.concurrent.{}}}{{{}ConcurrentHashMap{}}} for thread safety. > * As a mitigation before the fix is released, consider to use fewer threads > per executor process (i.e., fewer spark.executor.cores)to reduce the > likelihood of such a race condition. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org