Issues with partitionBy: FetchFailed

2014-09-21 Thread Julien Carme
Hello,

I am facing an issue with partitionBy, it is not clear whether it is a
problem with my code or with my spark setup. I am using Spark 1.1,
standalone, and my other spark projects work fine.

So I have to repartition a relatively large file (about 70 million lines).
Here is a minimal version of what is not working fine:

myRDD = sc.textFile(...).map { line = (extractKey(line),line) }
myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100))
myRepartitionedRDD.saveAsTextFile(...)

It runs quite some time, until I get some errors and it retries. Errors are:

FetchFailed(BlockManagerId(3,myWorker2, 52082,0),
shuffleId=1,mapId=1,reduceId=5)

Logs are not much more infomrative. I get:

Java.io.IOException : sendMessageReliability failed because ack was not
received within 60 sec

I get similar errors with all my workers.

Do you have some kind of explanation for this behaviour? What could be
wrong?

Thanks,


RE: Issues with partitionBy: FetchFailed

2014-09-21 Thread Shao, Saisai
Hi,

I’ve also met this problem before, I think you can try to set 
“spark.core.connection.ack.wait.timeout” to a large value to avoid ack timeout, 
default is 60 seconds.

Sometimes because of GC pause or some other reasons, acknowledged message will 
be timeout, which will lead to this exception, you can try setting a large 
value of this configuration.

Thanks
Jerry

From: Julien Carme [mailto:julien.ca...@gmail.com]
Sent: Sunday, September 21, 2014 7:43 PM
To: user@spark.apache.org
Subject: Issues with partitionBy: FetchFailed

Hello,
I am facing an issue with partitionBy, it is not clear whether it is a problem 
with my code or with my spark setup. I am using Spark 1.1, standalone, and my 
other spark projects work fine.
So I have to repartition a relatively large file (about 70 million lines). Here 
is a minimal version of what is not working fine:
myRDD = sc.textFile(...).map { line = (extractKey(line),line) }
myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100))
myRepartitionedRDD.saveAsTextFile(...)
It runs quite some time, until I get some errors and it retries. Errors are:
FetchFailed(BlockManagerId(3,myWorker2, 52082,0), 
shuffleId=1,mapId=1,reduceId=5)
Logs are not much more infomrative. I get:

Java.io.IOException : sendMessageReliability failed because ack was not 
received within 60 sec

I get similar errors with all my workers.
Do you have some kind of explanation for this behaviour? What could be wrong?
Thanks,




Re: Issues with partitionBy: FetchFailed

2014-09-21 Thread David Rowe
Hi,

I've seen this problem before, and I'm not convinced it's GC.

When spark shuffles it writes a lot of small files to store the data to be
sent to other executors (AFAICT). According to what I've read around the
place the intention is that these files be stored in disk buffers, and
since sync() is never called, they exist only in memory. The problem is
when you have a lot of shuffle data, and the executors are configured to
use, say 90% of your memory, one of those is going to be written to disk -
either the JVM will be swapped out, or the files will be written out of
cache.

So, when these nodes are timing out, it's not a GC problem, it's that the
machine is actually thrashing.

I've had some success with this problem by reducing the amount of memory
that the executors are configured to use from say 90% to 60%. I don't know
the internals of the code, but I'm sure this number is related to the
fraction of your data that's going to be shuffled to other nodes. In any
case, it's not something that I can estimate in my own jobs very well - I
usually have to find the right number by trial and error.

Perhaps somebody who knows the internals a bit better can shed some light.

Cheers

Dave

On Sun, Sep 21, 2014 at 9:54 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Hi,



 I’ve also met this problem before, I think you can try to set
 “spark.core.connection.ack.wait.timeout” to a large value to avoid ack
 timeout, default is 60 seconds.



 Sometimes because of GC pause or some other reasons, acknowledged message
 will be timeout, which will lead to this exception, you can try setting a
 large value of this configuration.



 Thanks

 Jerry



 *From:* Julien Carme [mailto:julien.ca...@gmail.com]
 *Sent:* Sunday, September 21, 2014 7:43 PM
 *To:* user@spark.apache.org
 *Subject:* Issues with partitionBy: FetchFailed



 Hello,

 I am facing an issue with partitionBy, it is not clear whether it is a
 problem with my code or with my spark setup. I am using Spark 1.1,
 standalone, and my other spark projects work fine.

 So I have to repartition a relatively large file (about 70 million lines).
 Here is a minimal version of what is not working fine:

 myRDD = sc.textFile(...).map { line = (extractKey(line),line) }

 myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100))

 myRepartitionedRDD.saveAsTextFile(...)

 It runs quite some time, until I get some errors and it retries. Errors
 are:

 FetchFailed(BlockManagerId(3,myWorker2, 52082,0),
 shuffleId=1,mapId=1,reduceId=5)

 Logs are not much more infomrative. I get:

 Java.io.IOException : sendMessageReliability failed because ack was not
 received within 60 sec



 I get similar errors with all my workers.

 Do you have some kind of explanation for this behaviour? What could be
 wrong?

 Thanks,