Brad Willard created SPARK-17555:
------------------------------------

             Summary: ExternalShuffleBlockResolver fails randomly with External 
Shuffle Service  and Dynamic Resource Allocation on Mesos running under Marathon
                 Key: SPARK-17555
                 URL: https://issues.apache.org/jira/browse/SPARK-17555
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.0.0, 1.6.2, 1.6.1, 1.5.2
         Environment: Mesos using docker with external shuffle service running 
on Marathon. Running code from pyspark shell in client mode.
            Reporter: Brad Willard


External Shuffle Service throws these errors about 90% of the time. It seems to 
die between stages and work inconsistently with these style of errors about 
missing files. I've tested this same behavior with all the spark versions 
listed on the jira using the pre-build hadoop 2.6 distributions from the apache 
spark download page. I also want to mention everything works successfully with 
dynamic resource allocation turned off.

I have read other related bugs and have tried some of the 
workaround/suggestions. Seems like some people have blamed the switch from akka 
to netty which got me testing this in the 1.5* range with no luck. I'm 
currently running these config option (informed by reading other bugs on jira 
that seemed related to my problem). These settings have helped it work 
sometimes instead of never.

{code}
spark.shuffle.service.port                  7338
spark.shuffle.io.numConnectionsPerPeer  4
spark.shuffle.io.connectionTimeout      18000s
spark.shuffle.service.enabled           true
spark.dynamicAllocation.enabled         true
{code}

on the driver for pyspark submit I'm sending along this config

{code}
    --conf 
spark.mesos.executor.docker.image=docker-registry.xxxxxxxxxxxxx.net/machine-learning/spark:spark-1-6-2v1
 \
    --conf spark.shuffle.service.enabled=true \
    --conf spark.dynamicAllocation.enabled=true \
    --conf spark.mesos.coarse=true \
    --conf spark.cores.max=100 \
    --conf spark.executor.uri=$SPARK_EXECUTOR_URI \
    --conf spark.shuffle.service.port=7338 \
    --executor-memory 15g
{code}

Under Marathon I'm pinning each external shuffle service to an agent and 
starting the service like this.

{code}
$SPARK_HOME/sbin/start-mesos-shuffle-service.sh && tail -f 
$SPARK_HOME/logs/spark--org.apache.spark.deploy.mesos.MesosExternalShuffleService*
{code}

On startup it seems like all is well
{code}
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Python version 3.5.2 (default, Jul  2 2016 17:52:12)
SparkContext available as sc, HiveContext available as sqlContext.
>>> 16/09/15 11:35:53 INFO CoarseMesosSchedulerBackend: Mesos task 1 is now 
>>> TASK_RUNNING
16/09/15 11:35:53 INFO MesosExternalShuffleClient: Successfully registered app 
a7a50f09-0ce0-4417-91a9-fa694416e903-0091 with external shuffle service.
16/09/15 11:35:55 INFO CoarseMesosSchedulerBackend: Mesos task 0 is now 
TASK_RUNNING
16/09/15 11:35:55 INFO MesosExternalShuffleClient: Successfully registered app 
a7a50f09-0ce0-4417-91a9-fa694416e903-0091 with external shuffle service.
16/09/15 11:35:56 INFO CoarseMesosSchedulerBackend: Registered executor 
NettyRpcEndpointRef(null) (mesos-agent002.[redacted]:61281) with ID 
55300bb1-aca1-4dd1-8647-ce4a50f6d661-S6/1
16/09/15 11:35:56 INFO ExecutorAllocationManager: New executor 
55300bb1-aca1-4dd1-8647-ce4a50f6d661-S6/1 has registered (new total is 1)
16/09/15 11:35:56 INFO BlockManagerMasterEndpoint: Registering block manager 
mesos-agent002.[redacted]:46247 with 10.6 GB RAM, 
BlockManagerId(55300bb1-aca1-4dd1-8647-ce4a50f6d661-S6/1, 
mesos-agent002.[redacted], 46247)
16/09/15 11:35:59 INFO CoarseMesosSchedulerBackend: Registered executor 
NettyRpcEndpointRef(null) (mesos-agent004.[redacted]:42738) with ID 
55300bb1-aca1-4dd1-8647-ce4a50f6d661-S10/0
16/09/15 11:35:59 INFO ExecutorAllocationManager: New executor 
55300bb1-aca1-4dd1-8647-ce4a50f6d661-S10/0 has registered (new total is 2)
16/09/15 11:35:59 INFO BlockManagerMasterEndpoint: Registering block manager 
mesos-agent004.[redacted]:11262 with 10.6 GB RAM, 
BlockManagerId(55300bb1-aca1-4dd1-8647-ce4a50f6d661-S10/0, 
mesos-agent004.[redacted], 11262)

{code}


I'm running a simple sort command on a spark data frame loaded from hdfs from a 
parquet file. These are the errors. They happen shortly after startup as agents 
are being allocated by mesos. 

{code}
16/09/15 14:49:10 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) 
from mesos-agent004:7338
java.lang.RuntimeException: java.lang.RuntimeException: Failed to open file: 
/tmp/blockmgr-29f93bea-9a87-41de-9046-535401e6d4fd/30/shuffle_0_0_0.index
        at 
org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:286)
        at 
org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:190)
        at 
org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:85)
        at 
org.apache.spark.deploy.mesos.MesosExternalShuffleBlockHandler.handleMessage(MesosExternalShuffleService.scala:61)
        at 
org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:72)
        at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:149)
        at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: 
/tmp/blockmgr-29f93bea-9a87-41de-9046-535401e6d4fd/30/shuffle_0_0_0.index (No 
such file or directory)
        at java.io.FileInputStream.open(Native Method)
        at java.io.FileInputStream.<init>(FileInputStream.java:146)
        at 
org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:275)
        ... 28 more

        at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:186)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:106)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:745)
16/09/15 14:49:10 ERROR RetryingBlockFetcher: Failed to fetch block 
shuffle_0_54_57, and will not retry (0 retries)
java.lang.RuntimeException: java.lang.RuntimeException: Failed to open file: 
/tmp/blockmgr-29f93bea-9a87-41de-9046-535401e6d4fd/30/shuffle_0_0_0.index
        at 
org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:286)
        at 
org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:190)
        at 
org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:85)
        at 
org.apache.spark.deploy.mesos.MesosExternalShuffleBlockHandler.handleMessage(MesosExternalShuffleService.scala:61)
        at 
org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:72)
        at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:149)
        at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to