Re: [Discuss][Spark staging dir] way to disable spark writing to _temporary

2017-04-08 Thread Steve Loughran

On 8 Apr 2017, at 10:36, Yash Sharma 
> wrote:

Very interesting. I will give it a try. Thanks for pointing this.
Also, are you planning to contribute it to spark, and could it be a good 
default option for spark S3 copies ?


It's going into Hadoop core itself, HADOOP-13786, with the relevant changes 
into FileOutputFormat to make it easier to implement similar committers for 
Azure, etc, where possible. There's matching changes going into S3A, and it'll 
be implemented in hadoop-aws JAR, so you'll need to be in sync across whichever 
hadoop release has it.

Spark is going to need to some changes for Parquet output code of 
ParquetFileFormat as its too fussy about committer type; either that code gets 
relaxed inside spark, or someone implements a bridge class. Since I've seen 
almost no enthusiasm for s3/cloud related spark PRs from me, I'll probably just 
do option 2 externally. Sorry.




Have you got any bench marking that could show the improvements in the job.

Given S3 internal copy is 6-10 MB/S, you can estimate execution time of today;s 
rename as (bytes/#of parallel threads in s3a rename); you'll still have the 
upload at end of task commit, so whichever task(s) commit last will also have 
measurable effect.

I'll have some TCP-DS numbers once I've got my variant of Ryan's committer 
ready for a colleague to test against. We've already got some good speedups 
with the read pipeline and recursive listing stuff in Hadoop 2.8; the commit 
one is the big one. If you grab Hadoop 2.8.0 right now you get all its 
read-time speedups, and the option for fast buffer uploads which will 
remove/reduce the delay at the end of IOStream.close() for the PUT.

In the meantime, don't commit directly to S3. Even with the classic 
"commit-by-rename" committer it's not safe: that committer assumes the rename 
is an atomic O(1) transaction, and in an object store, it isn't.





Thanks,
Yash

On Sat, 8 Apr 2017 at 02:38 Ryan Blue 
> wrote:
Yash,

We (Netflix) built a committer that uses the S3 multipart upload API to avoid 
the copy problem and still handle task failures. You can build and use the copy 
posted here:

  https://github.com/rdblue/s3committer

You're probably interested in the S3PartitionedOutputCommitter.

rb

On Thu, Apr 6, 2017 at 10:08 PM, Yash Sharma 
> wrote:
Hi All,
This is another issue that I was facing with the spark - s3 operability and 
wanted to ask to the broader community if its faced by anyone else.

I have a rather simple aggregation query with a basic transformation. The 
output however has lot of output partitions (20K partitions). The spark job 
runs very fast and reaches the end without any failures. So far the spark job 
has been writing to the staging dir and runs alright.

As soon as spark starts renaming these files it faces 2 issues:
1. s3 single path renames are insanely slow : and the job spends huge time 
renaming these files
2. Sometimes renames fail : spark probably has checks after writing the file 
(not sure) and sometimes few renames fail randomly because of s3's eventual 
consistency, causing the job to fail intermittently. [added logs at end]

I was wondering what could be some work arounds for this problem or is it 
possible to override this behavior and write files directly to the expected 
paths (skipping the staging dir _temporary).

Cheers,
Yash

{logs}
java.io.IOException: Failed to rename 
FileStatus{path=s3://instances/_temporary/0/task_201704060437_0005_m_52/utc_date=2012-06-19/product=obsolete;
 isDirectory=true; modification_time=0; access_time=0; owner=; group=; 
permission=rwxrwxrwx; isSymlink=false} to 
s3://instances/utc_date=2012-06-19/product=obsolete
at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.renameOrMerge(FileOutputCommitter.java:441)
at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:432)
at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
...
...
InsertIntoHadoopFsRelationCommand.scala:115)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
...
...
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)
17/04/06 04:41:54 ERROR DynamicPartitionWriterContainer: Job 
job_201704060436_ aborted.
17/04/06 04:41:54 ERROR ActiveInstances$: Exception in running ActiveInstances.
org.apache.spark.SparkException: Job aborted.
at 

Re: [Discuss][Spark staging dir] way to disable spark writing to _temporary

2017-04-08 Thread Yash Sharma
Very interesting. I will give it a try. Thanks for pointing this.
Also, are you planning to contribute it to spark, and could it be a good
default option for spark S3 copies ?
Have you got any bench marking that could show the improvements in the job.

Thanks,
Yash

On Sat, 8 Apr 2017 at 02:38 Ryan Blue  wrote:

> Yash,
>
> We (Netflix) built a committer that uses the S3 multipart upload API to
> avoid the copy problem and still handle task failures. You can build and
> use the copy posted here:
>
>   https://github.com/rdblue/s3committer
>
> You're probably interested in the S3PartitionedOutputCommitter.
>
> rb
>
> On Thu, Apr 6, 2017 at 10:08 PM, Yash Sharma  wrote:
>
> Hi All,
> This is another issue that I was facing with the spark - s3 operability
> and wanted to ask to the broader community if its faced by anyone else.
>
> I have a rather simple aggregation query with a basic transformation. The
> output however has lot of output partitions (20K partitions). The spark job
> runs very fast and reaches the end without any failures. So far the spark
> job has been writing to the staging dir and runs alright.
>
> As soon as spark starts renaming these files it faces 2 issues:
> 1. s3 single path renames are insanely slow : and the job spends huge time
> renaming these files
> 2. Sometimes renames fail : spark probably has checks after writing the
> file (not sure) and sometimes few renames fail randomly because of s3's
> eventual consistency, causing the job to fail intermittently. [added logs
> at end]
>
> I was wondering what could be some work arounds for this problem or is it
> possible to override this behavior and write files directly to the expected
> paths (skipping the staging dir _temporary).
>
> Cheers,
> Yash
>
> {logs}
> java.io.IOException: Failed to rename
> FileStatus{path=s3://instances/_temporary/0/task_201704060437_0005_m_52/utc_date=2012-06-19/product=obsolete;
> isDirectory=true; modification_time=0; access_time=0; owner=; group=;
> permission=rwxrwxrwx; isSymlink=false} to
> s3://instances/utc_date=2012-06-19/product=obsolete
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.renameOrMerge(FileOutputCommitter.java:441)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:432)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
> ...
> ...
> InsertIntoHadoopFsRelationCommand.scala:115)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
> ...
> ...
> at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)
> 17/04/06 04:41:54 ERROR DynamicPartitionWriterContainer: Job
> job_201704060436_ aborted.
> 17/04/06 04:41:54 ERROR ActiveInstances$: Exception in running
> ActiveInstances.
> org.apache.spark.SparkException: Job aborted.
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:149)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
>
> {logs}
>
>
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: [Discuss][Spark staging dir] way to disable spark writing to _temporary

2017-04-07 Thread Ryan Blue
Yash,

We (Netflix) built a committer that uses the S3 multipart upload API to
avoid the copy problem and still handle task failures. You can build and
use the copy posted here:

  https://github.com/rdblue/s3committer

You're probably interested in the S3PartitionedOutputCommitter.

rb

On Thu, Apr 6, 2017 at 10:08 PM, Yash Sharma  wrote:

> Hi All,
> This is another issue that I was facing with the spark - s3 operability
> and wanted to ask to the broader community if its faced by anyone else.
>
> I have a rather simple aggregation query with a basic transformation. The
> output however has lot of output partitions (20K partitions). The spark job
> runs very fast and reaches the end without any failures. So far the spark
> job has been writing to the staging dir and runs alright.
>
> As soon as spark starts renaming these files it faces 2 issues:
> 1. s3 single path renames are insanely slow : and the job spends huge time
> renaming these files
> 2. Sometimes renames fail : spark probably has checks after writing the
> file (not sure) and sometimes few renames fail randomly because of s3's
> eventual consistency, causing the job to fail intermittently. [added logs
> at end]
>
> I was wondering what could be some work arounds for this problem or is it
> possible to override this behavior and write files directly to the expected
> paths (skipping the staging dir _temporary).
>
> Cheers,
> Yash
>
> {logs}
> java.io.IOException: Failed to rename FileStatus{path=s3://
> instances/_temporary/0/task_201704060437_0005_m_52/
> utc_date=2012-06-19/product=obsolete; isDirectory=true;
> modification_time=0; access_time=0; owner=; group=; permission=rwxrwxrwx;
> isSymlink=false} to s3://instances/utc_date=2012-06-19/product=obsolete
> at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.
> renameOrMerge(FileOutputCommitter.java:441)
> at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(
> FileOutputCommitter.java:432)
> at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(
> FileOutputCommitter.java:428)
> at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(
> FileOutputCommitter.java:428)
> ...
> ...
> InsertIntoHadoopFsRelationCommand.scala:115)
> at org.apache.spark.sql.execution.datasources.
> InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(
> InsertIntoHadoopFsRelationCommand.scala:115)
> at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(
> SQLExecution.scala:57)
> at org.apache.spark.sql.execution.datasources.
> InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationComm
> and.scala:115)
> ...
> ...
> at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(
> ApplicationMaster.scala:627)
> 17/04/06 04:41:54 ERROR DynamicPartitionWriterContainer: Job
> job_201704060436_ aborted.
> 17/04/06 04:41:54 ERROR ActiveInstances$: Exception in running
> ActiveInstances.
> org.apache.spark.SparkException: Job aborted.
> at org.apache.spark.sql.execution.datasources.
> InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(
> InsertIntoHadoopFsRelationCommand.scala:149)
> at org.apache.spark.sql.execution.datasources.
> InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(
> InsertIntoHadoopFsRelationCommand.scala:115)
>
> {logs}
>
>
>


-- 
Ryan Blue
Software Engineer
Netflix


[Discuss][Spark staging dir] way to disable spark writing to _temporary

2017-04-06 Thread Yash Sharma
Hi All,
This is another issue that I was facing with the spark - s3 operability and
wanted to ask to the broader community if its faced by anyone else.

I have a rather simple aggregation query with a basic transformation. The
output however has lot of output partitions (20K partitions). The spark job
runs very fast and reaches the end without any failures. So far the spark
job has been writing to the staging dir and runs alright.

As soon as spark starts renaming these files it faces 2 issues:
1. s3 single path renames are insanely slow : and the job spends huge time
renaming these files
2. Sometimes renames fail : spark probably has checks after writing the
file (not sure) and sometimes few renames fail randomly because of s3's
eventual consistency, causing the job to fail intermittently. [added logs
at end]

I was wondering what could be some work arounds for this problem or is it
possible to override this behavior and write files directly to the expected
paths (skipping the staging dir _temporary).

Cheers,
Yash

{logs}
java.io.IOException: Failed to rename
FileStatus{path=s3://instances/_temporary/0/task_201704060437_0005_m_52/utc_date=2012-06-19/product=obsolete;
isDirectory=true; modification_time=0; access_time=0; owner=; group=;
permission=rwxrwxrwx; isSymlink=false} to
s3://instances/utc_date=2012-06-19/product=obsolete
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.renameOrMerge(FileOutputCommitter.java:441)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:432)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
...
...
InsertIntoHadoopFsRelationCommand.scala:115)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
...
...
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)
17/04/06 04:41:54 ERROR DynamicPartitionWriterContainer: Job
job_201704060436_ aborted.
17/04/06 04:41:54 ERROR ActiveInstances$: Exception in running
ActiveInstances.
org.apache.spark.SparkException: Job aborted.
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:149)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)

{logs}