[ 
https://issues.apache.org/jira/browse/SPARK-18838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137365#comment-16137365
 ] 

Marcelo Vanzin commented on SPARK-18838:
----------------------------------------

[~milesc] after cleaning up a lot of noise from your logs, I could see a bunch 
of errors, but I can't see how any of them are connected to the listener bus. 
They seem to be more related to the way Spark is handling those errors and 
making things worse, instead of correctly handling them. I don't doubt that 
using the "block the listener bus" patch helps, but it probably helps because 
of what I said before: that will have an effect on other parts of Spark, and 
will probably make your application run slower, and will mask these issues. So 
while that behavior may be better for you, it doesn't help us solve the 
underlying issues in the code, and Spark running more slowly is hardly 
something we want.

Here are a few choice errors from your log.

Shuffle files disappearing. There are quite a few of these.
{noformat}
2017-08-15 17:52:17,033 INFO  org.apache.spark.scheduler.DAGScheduler: 
ShuffleMapStage 80 (keyBy at MergeSourcedPapers.scala:94) failed in 266.509 s 
due to org.apache.spark.shuffle.FetchFailedException: 
java.io.FileNotFoundException: 
/mnt/yarn/usercache/milesc/appcache/application_1502651039737_0038/blockmgr-9c34afa1-8738-4003-b532-dc85e04cfb45/26/shuffle_4_4428_0.index
 (No such file or directory)
        at java.io.FileInputStream.open0(Native Method)
        at java.io.FileInputStream.open(FileInputStream.java:195)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at 
org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:199)
        at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:60)
{noformat}

More missing shuffle data:
{noformat}
2017-08-15 17:52:18,120 INFO  org.apache.spark.MapOutputTrackerMaster: Size of 
output statuses for shuffle 3 is 118757 bytes
2017-08-15 17:52:18,121 INFO  org.apache.spark.MapOutputTrackerMaster: Epoch 
changed, not caching!
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
location for shuffle 3
        at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
        at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
        at 
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.sca
{noformat}

Executors apparently dying:
{noformat}
2017-08-15 17:52:19,508 WARN  
org.apache.spark.network.server.TransportChannelHandler: Exception in 
connection from /10.12.1.121:51766
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
{noformat}

Executor apparently dying, another type of error:
{noformat}
2017-08-15 17:52:26,774 WARN  org.apache.spark.storage.BlockManagerMaster: 
Failed to remove broadcast 56 with removeFromMaster = true - Failed to send RPC 
4932545731330680871 to /10.12.1.121:51764: 
java.nio.channels.ClosedChannelException
java.io.IOException: Failed to send RPC 4932545731330680871 to 
/10.12.1.121:51764: java.nio.channels.ClosedChannelException
        at 
org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:249)
        at 
org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:233)
        at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:514)
        at 
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:488)
        at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:427)
        at 
io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:129)
{noformat}

Another type of executor going away message:
{noformat}
2017-08-15 17:52:32,011 INFO  org.apache.spark.scheduler.DAGScheduler: Shuffle 
files lost for executor: 21 (epoch 122)
org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
ip-10-12-1-121.us-west-2.compute.internal/10.12.1.121:33269
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:361)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:336)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
...
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: 
Connection refused: ip-10-12-1-121.us-west-2.compute.internal/10.12.1.121:33269
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
{noformat}

YARN scheduler getting backed up:
{noformat}
2017-08-15 17:56:19,816 ERROR 
org.apache.spark.scheduler.cluster.YarnClusterScheduler: Lost executor 23 on 
ip-10-12-13-61.us-west-2.compute.internal: Slave lost
2017-08-15 17:56:19,863 WARN  
org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: 
Attempted to get executor loss reason for executor id 21 at RPC address 
10.12.1.121:51766, but got no response. Marking as slave lost.
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 240 
seconds. This timeout is controlled by spark.network.timeout
        at 
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
        at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
{noformat}

Weird network issues:
{noformat}
2017-08-15 19:45:14,126 INFO  org.apache.spark.ContextCleaner: Cleaned 
accumulator 13006962
org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
ip-10-12-12-210.us-west-2.compute.internal/10.12.12.210:38223
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:361)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:336)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:396)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
...
Caused by: io.netty.channel.AbstractChannel$AnnotatedNoRouteToHostException: No 
route to host: ip-10-12-12-210.us-west-2.compute.internal/10.12.12.210:38223
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
{noformat}

And it all ends with Spark apparently endlessly trying to distribute a 
broadcast variable to the same executor over and over again, without making any 
progress. Something that might be understandable with a longer look at the logs 
(and if the executor logs were also available).

So there's a whole bunch of things going on, some of which I've seen before and 
might even be fixed already, but I can't really see anything that is directly 
related to the listener bus being backed up.

> High latency of event processing for large jobs
> -----------------------------------------------
>
>                 Key: SPARK-18838
>                 URL: https://issues.apache.org/jira/browse/SPARK-18838
>             Project: Spark
>          Issue Type: Improvement
>    Affects Versions: 2.0.0
>            Reporter: Sital Kedia
>         Attachments: perfResults.pdf, SparkListernerComputeTime.xlsx
>
>
> Currently we are observing the issue of very high event processing delay in 
> driver's `ListenerBus` for large jobs with many tasks. Many critical 
> component of the scheduler like `ExecutorAllocationManager`, 
> `HeartbeatReceiver` depend on the `ListenerBus` events and this delay might 
> hurt the job performance significantly or even fail the job.  For example, a 
> significant delay in receiving the `SparkListenerTaskStart` might cause 
> `ExecutorAllocationManager` manager to mistakenly remove an executor which is 
> not idle.  
> The problem is that the event processor in `ListenerBus` is a single thread 
> which loops through all the Listeners for each event and processes each event 
> synchronously 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94.
>  This single threaded processor often becomes the bottleneck for large jobs.  
> Also, if one of the Listener is very slow, all the listeners will pay the 
> price of delay incurred by the slow listener. In addition to that a slow 
> listener can cause events to be dropped from the event queue which might be 
> fatal to the job.
> To solve the above problems, we propose to get rid of the event queue and the 
> single threaded event processor. Instead each listener will have its own 
> dedicate single threaded executor service . When ever an event is posted, it 
> will be submitted to executor service of all the listeners. The Single 
> threaded executor service will guarantee in order processing of the events 
> per listener.  The queue used for the executor service will be bounded to 
> guarantee we do not grow the memory indefinitely. The downside of this 
> approach is separate event queue per listener will increase the driver memory 
> footprint. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to