Hi,

I'm running into consistent failures during a shuffle read while trying to
do a group-by followed by a count aggregation (using the DataFrame API on
Spark 1.5.2).

The shuffle read (in stage 1) fails with

org.apache.spark.shuffle.FetchFailedException: Failed to send RPC
7719188499899260109 to host_a/ip_a:35946:
java.nio.channels.ClosedChannelException
                at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:321)


Looking into executor logs shows first shows

ERROR TransportChannelHandler: Connection to host_b/ip_b:38804 has been
quiet for 120000 ms while there are outstanding requests. Assuming
connection is dead; please adjust spark.network.timeout if this is wrong.

on the node that threw the FetchFailedException (host_a) and

ERROR TransportRequestHandler: Error sending result
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=207789700738,
chunkIndex=894},
buffer=FileSegmentManagedBuffer{file=/local_disk/spark-ed6667d4-445b-4d65-bfda-e4540b7215aa/executor-d03e5e7e-57d4-40e2-9021-c20d0b84bf75/blockmgr-05d5f2b6-142e-415c-a08b-58d16a10b8bf/27/shuffle_1_13732_0.data,
offset=18960736, length=19477}} to /ip_a:32991; closing connection

on the node referenced in the exception (host_b). The error in the host_b
logs occurred a few seconds after the error in the host_a logs. I noticed
there was a lot of spilling going on during the shuffle read, so I
attempted to work around this problem by increasing the number of shuffle
partitions (to decrease spilling) as well as increasing
spark.network.timeout. Neither of these got rid of these connection
failures.

This causes some of stage 0 to recompute (which runs successfully). Stage 1
retry 1 then always fails with

java.io.IOException: FAILED_TO_UNCOMPRESS(5)
                at
org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)

Changing the spark.io.compression.codec to lz4 changes this error to

java.io.IOException: Stream is corrupted
        at
net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:153)

which leads me to believe that the timeout during the shuffle read failure
leaves invalid files on disk.

Notably, these failures do not occur when I run on smaller subsets of data.
The failure is occurring while attempting to group ~100 billion rows into
20 billion groups (with key size of 24 bytes and count as the only
aggregation) on a 16 node cluster. I've replicated this failure on 2
completely separate clusters (both running with standalone cluster manager).

Does anyone have suggestions about how I could make this crash go away or
how I could try to make a smaller failing test case so the bug can be more
easily investigated?

Best,
Eric Martin

Reply via email to