RE: Spark will process _temporary folder on S3 is very slow and always cause failure

2015-03-20 Thread Shuai Zheng
Thanks!

 

Let me update the status.

 

I have copied the DirectOutputCommitter to my local. And set:

 

Conf.set(spark.hadoop.mapred.output.committer.class, 
org..DirectOutputCommitter)

 

It works perfectly.

 

Thanks  everyone J

 

Regards,

 

Shuai

 

From: Aaron Davidson [mailto:ilike...@gmail.com] 
Sent: Tuesday, March 17, 2015 3:06 PM
To: Imran Rashid
Cc: Shuai Zheng; user@spark.apache.org
Subject: Re: Spark will process _temporary folder on S3 is very slow and always 
cause failure

 

Actually, this is the more relevant JIRA (which is resolved):

https://issues.apache.org/jira/browse/SPARK-3595

 

6352 is about saveAsParquetFile, which is not in use here.

 

Here is a DirectOutputCommitter implementation:

https://gist.github.com/aarondav/c513916e72101bbe14ec

 

and it can be configured in Spark with:

sparkConf.set(spark.hadoop.mapred.output.committer.class, 
classOf[DirectOutputCommitter].getName)

 

On Tue, Mar 17, 2015 at 8:05 AM, Imran Rashid iras...@cloudera.com wrote:

I'm not super familiar w/ S3, but I think the issue is that you want to use a 
different output committers with object stores, that don't have a simple move 
operation.  There have been a few other threads on S3  outputcommitters.  I 
think the most relevant for you is most probably this open JIRA:

 

https://issues.apache.org/jira/browse/SPARK-6352

 

On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng szheng.c...@gmail.com wrote:

Hi All,

 

I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it as 
a single node cluster for test. The data I use to sort is around 4GB and sit on 
S3, output will also on S3.

 

I just connect spark-shell to the local cluster and run the code in the script 
(because I just want a benchmark now).

 

My job is as simple as:

val parquetFile = 
sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,)

parquetFile.registerTempTable(Test)

val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map { row 
= { row.mkString(\t) } }

sortedResult.saveAsTextFile(s3n://myplace,);

 

The job takes around 6 mins to finish the sort when I am monitoring the 
process. After I notice the process stop at: 

 

15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at 
console:31, took 581.304992 s

 

At that time, the spark actually just write all the data to the _temporary 
folder first, after all sub-tasks finished, it will try to move all the ready 
result from _temporary folder to the final location. This process might be 
quick locally (because it will just be a cut/paste), but it looks like very 
slow on my S3, it takes a few second to move one file (usually there will be 
200 partitions). And then it raise exceptions after it move might be 40-50 
files.

 

org.apache.http.NoHttpResponseException: The target server failed to respond

at 
org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101)

at 
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252)

at 
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281)

at 
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247)

at 
org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)

 



 

I try several times, but never get the full job finished. I am not sure 
anything wrong here, but I use something very basic and I can see the job has 
finished and all result on the S3 under temporary folder, but then it raise the 
exception and fail. 

 

Any special setting I should do here when deal with S3?

 

I don’t know what is the issue here, I never see MapReduce has similar issue. 
So it could not be S3’s problem.

 

Regards,

 

Shuai

 

 



Re: Spark will process _temporary folder on S3 is very slow and always cause failure

2015-03-17 Thread Imran Rashid
I'm not super familiar w/ S3, but I think the issue is that you want to use
a different output committers with object stores, that don't have a
simple move operation.  There have been a few other threads on S3 
outputcommitters.  I think the most relevant for you is most probably this
open JIRA:

https://issues.apache.org/jira/browse/SPARK-6352

On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng szheng.c...@gmail.com wrote:

 Hi All,



 I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run
 it as a single node cluster for test. The data I use to sort is around 4GB
 and sit on S3, output will also on S3.



 I just connect spark-shell to the local cluster and run the code in the
 script (because I just want a benchmark now).



 My job is as simple as:

 val parquetFile =
 sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,)

 parquetFile.registerTempTable(Test)

 val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map
 { row = { row.mkString(\t) } }

 sortedResult.saveAsTextFile(s3n://myplace,);



 The job takes around 6 mins to finish the sort when I am monitoring the
 process. After I notice the process stop at:



 15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at
 console:31, took 581.304992 s



 At that time, the spark actually just write all the data to the _temporary
 folder first, after all sub-tasks finished, it will try to move all the
 ready result from _temporary folder to the final location. This process
 might be quick locally (because it will just be a cut/paste), but it looks
 like very slow on my S3, it takes a few second to move one file (usually
 there will be 200 partitions). And then it raise exceptions after it move
 might be 40-50 files.



 org.apache.http.NoHttpResponseException: The target server failed to
 respond

 at
 org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101)

 at
 org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252)

 at
 org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281)

 at
 org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247)

 at
 org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)





 I try several times, but never get the full job finished. I am not sure
 anything wrong here, but I use something very basic and I can see the job
 has finished and all result on the S3 under temporary folder, but then it
 raise the exception and fail.



 Any special setting I should do here when deal with S3?



 I don’t know what is the issue here, I never see MapReduce has similar
 issue. So it could not be S3’s problem.



 Regards,



 Shuai



Re: Spark will process _temporary folder on S3 is very slow and always cause failure

2015-03-17 Thread Aaron Davidson
Actually, this is the more relevant JIRA (which is resolved):
https://issues.apache.org/jira/browse/SPARK-3595

6352 is about saveAsParquetFile, which is not in use here.

Here is a DirectOutputCommitter implementation:
https://gist.github.com/aarondav/c513916e72101bbe14ec

and it can be configured in Spark with:
sparkConf.set(spark.hadoop.mapred.output.committer.class,
classOf[DirectOutputCommitter].getName)

On Tue, Mar 17, 2015 at 8:05 AM, Imran Rashid iras...@cloudera.com wrote:

 I'm not super familiar w/ S3, but I think the issue is that you want to
 use a different output committers with object stores, that don't have a
 simple move operation.  There have been a few other threads on S3 
 outputcommitters.  I think the most relevant for you is most probably this
 open JIRA:

 https://issues.apache.org/jira/browse/SPARK-6352

 On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng szheng.c...@gmail.com
 wrote:

 Hi All,



 I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run
 it as a single node cluster for test. The data I use to sort is around 4GB
 and sit on S3, output will also on S3.



 I just connect spark-shell to the local cluster and run the code in the
 script (because I just want a benchmark now).



 My job is as simple as:

 val parquetFile =
 sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,)

 parquetFile.registerTempTable(Test)

 val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map
 { row = { row.mkString(\t) } }

 sortedResult.saveAsTextFile(s3n://myplace,);



 The job takes around 6 mins to finish the sort when I am monitoring the
 process. After I notice the process stop at:



 15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at
 console:31, took 581.304992 s



 At that time, the spark actually just write all the data to the
 _temporary folder first, after all sub-tasks finished, it will try to move
 all the ready result from _temporary folder to the final location. This
 process might be quick locally (because it will just be a cut/paste), but
 it looks like very slow on my S3, it takes a few second to move one file
 (usually there will be 200 partitions). And then it raise exceptions after
 it move might be 40-50 files.



 org.apache.http.NoHttpResponseException: The target server failed to
 respond

 at
 org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101)

 at
 org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252)

 at
 org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281)

 at
 org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247)

 at
 org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)





 I try several times, but never get the full job finished. I am not sure
 anything wrong here, but I use something very basic and I can see the job
 has finished and all result on the S3 under temporary folder, but then it
 raise the exception and fail.



 Any special setting I should do here when deal with S3?



 I don’t know what is the issue here, I never see MapReduce has similar
 issue. So it could not be S3’s problem.



 Regards,



 Shuai





Re: Spark will process _temporary folder on S3 is very slow and always cause failure

2015-03-16 Thread Akhil Das
If you use fileStream, there's an option to filter out files. In your case
you can easily create a filter to remove _temporary files. In that case,
you will have to move your codes inside foreachRDD of the dstream since the
application will become a streaming app.

Thanks
Best Regards

On Sat, Mar 14, 2015 at 4:26 AM, Shuai Zheng szheng.c...@gmail.com wrote:

 And one thing forget to mention, even I have this exception and the result
 is not well format in my target folder (part of them are there, rest are
 under different folder structure of _tempoary folder). In the webUI of
 spark-shell, it is still be marked as successful step. I think this is a
 bug?



 Regards,



 Shuai



 *From:* Shuai Zheng [mailto:szheng.c...@gmail.com]
 *Sent:* Friday, March 13, 2015 6:51 PM
 *To:* user@spark.apache.org
 *Subject:* Spark will process _temporary folder on S3 is very slow and
 always cause failure



 Hi All,



 I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run
 it as a single node cluster for test. The data I use to sort is around 4GB
 and sit on S3, output will also on S3.



 I just connect spark-shell to the local cluster and run the code in the
 script (because I just want a benchmark now).



 My job is as simple as:

 val parquetFile =
 sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,)

 parquetFile.registerTempTable(Test)

 val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map
 { row = { row.mkString(\t) } }

 sortedResult.saveAsTextFile(s3n://myplace,);



 The job takes around 6 mins to finish the sort when I am monitoring the
 process. After I notice the process stop at:



 15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at
 console:31, took 581.304992 s



 At that time, the spark actually just write all the data to the _temporary
 folder first, after all sub-tasks finished, it will try to move all the
 ready result from _temporary folder to the final location. This process
 might be quick locally (because it will just be a cut/paste), but it looks
 like very slow on my S3, it takes a few second to move one file (usually
 there will be 200 partitions). And then it raise exceptions after it move
 might be 40-50 files.



 org.apache.http.NoHttpResponseException: The target server failed to
 respond

 at
 org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101)

 at
 org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252)

 at
 org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281)

 at
 org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247)

 at
 org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)





 I try several times, but never get the full job finished. I am not sure
 anything wrong here, but I use something very basic and I can see the job
 has finished and all result on the S3 under temporary folder, but then it
 raise the exception and fail.



 Any special setting I should do here when deal with S3?



 I don’t know what is the issue here, I never see MapReduce has similar
 issue. So it could not be S3’s problem.



 Regards,



 Shuai



Spark will process _temporary folder on S3 is very slow and always cause failure

2015-03-13 Thread Shuai Zheng
Hi All,

 

I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it
as a single node cluster for test. The data I use to sort is around 4GB and
sit on S3, output will also on S3.

 

I just connect spark-shell to the local cluster and run the code in the
script (because I just want a benchmark now).

 

My job is as simple as:

val parquetFile =
sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3
n://...,s3n://...,)

parquetFile.registerTempTable(Test)

val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map {
row = { row.mkString(\t) } }

sortedResult.saveAsTextFile(s3n://myplace,);

 

The job takes around 6 mins to finish the sort when I am monitoring the
process. After I notice the process stop at: 

 

15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at
console:31, took 581.304992 s

 

At that time, the spark actually just write all the data to the _temporary
folder first, after all sub-tasks finished, it will try to move all the
ready result from _temporary folder to the final location. This process
might be quick locally (because it will just be a cut/paste), but it looks
like very slow on my S3, it takes a few second to move one file (usually
there will be 200 partitions). And then it raise exceptions after it move
might be 40-50 files.

 

org.apache.http.NoHttpResponseException: The target server failed to respond

at
org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponsePar
ser.java:101)

at
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.ja
va:252)

at
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(Abst
ractHttpClientConnection.java:281)

at
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(Defa
ultClientConnection.java:247)

at
org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(Ab
stractClientConnAdapter.java:219)

 



 

I try several times, but never get the full job finished. I am not sure
anything wrong here, but I use something very basic and I can see the job
has finished and all result on the S3 under temporary folder, but then it
raise the exception and fail. 

 

Any special setting I should do here when deal with S3?

 

I don't know what is the issue here, I never see MapReduce has similar
issue. So it could not be S3's problem.

 

Regards,

 

Shuai



RE: Spark will process _temporary folder on S3 is very slow and always cause failure

2015-03-13 Thread Shuai Zheng
And one thing forget to mention, even I have this exception and the result
is not well format in my target folder (part of them are there, rest are
under different folder structure of _tempoary folder). In the webUI of
spark-shell, it is still be marked as successful step. I think this is a
bug?

 

Regards,

 

Shuai

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Friday, March 13, 2015 6:51 PM
To: user@spark.apache.org
Subject: Spark will process _temporary folder on S3 is very slow and always
cause failure

 

Hi All,

 

I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it
as a single node cluster for test. The data I use to sort is around 4GB and
sit on S3, output will also on S3.

 

I just connect spark-shell to the local cluster and run the code in the
script (because I just want a benchmark now).

 

My job is as simple as:

val parquetFile =
sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3
n://...,s3n://...,)

parquetFile.registerTempTable(Test)

val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map {
row = { row.mkString(\t) } }

sortedResult.saveAsTextFile(s3n://myplace,);

 

The job takes around 6 mins to finish the sort when I am monitoring the
process. After I notice the process stop at: 

 

15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at
console:31, took 581.304992 s

 

At that time, the spark actually just write all the data to the _temporary
folder first, after all sub-tasks finished, it will try to move all the
ready result from _temporary folder to the final location. This process
might be quick locally (because it will just be a cut/paste), but it looks
like very slow on my S3, it takes a few second to move one file (usually
there will be 200 partitions). And then it raise exceptions after it move
might be 40-50 files.

 

org.apache.http.NoHttpResponseException: The target server failed to respond

at
org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponsePar
ser.java:101)

at
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.ja
va:252)

at
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(Abst
ractHttpClientConnection.java:281)

at
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(Defa
ultClientConnection.java:247)

at
org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(Ab
stractClientConnAdapter.java:219)

 



 

I try several times, but never get the full job finished. I am not sure
anything wrong here, but I use something very basic and I can see the job
has finished and all result on the S3 under temporary folder, but then it
raise the exception and fail. 

 

Any special setting I should do here when deal with S3?

 

I don't know what is the issue here, I never see MapReduce has similar
issue. So it could not be S3's problem.

 

Regards,

 

Shuai