On 12 Sep 2016, at 19:58, Srikanth 
<srikanth...@gmail.com<mailto:srikanth...@gmail.com>> wrote:

Thanks Steve!

We are already using HDFS as an intermediate store. This is for the last stage 
of processing which has to put data in S3.
The output is partitioned by 3 fields, like 
.../field1=111/field2=999/date=YYYY-MM-DD/*
Given that there are 100s for folders and 1000s of subfolder and part files, 
rename from _temporary is just not practical in S3.
I guess we have to add another stage with S3Distcp??

Afraid so

Srikanth

On Sun, Sep 11, 2016 at 2:34 PM, Steve Loughran 
<ste...@hortonworks.com<mailto:ste...@hortonworks.com>> wrote:

> On 9 Sep 2016, at 21:54, Srikanth 
> <srikanth...@gmail.com<mailto:srikanth...@gmail.com>> wrote:
>
> Hello,
>
> I'm trying to use DirectOutputCommitter for s3a in Spark 2.0. I've tried a 
> few configs and none of them seem to work.
> Output always creates _temporary directory. Rename is killing performance.

> I read some notes about DirectOutputcommitter causing problems with 
> speculation turned on. Was this option removed entirely?

Spark turns off any committer with the word "direct' in its name if 
speculation==true . Concurrency, see.

even on on-speculative execution, the trouble with the direct options is that 
executor/job failures can leave incomplete/inconsistent work around —and the 
things downstream wouldn't even notice

There's work underway to address things, work which requires a consistent 
metadata store alongside S3 ( HADOOP-13345 : S3Guard).

For now: stay with the file output committer

hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true

Even better: use HDFS as the intermediate store for work, only do a bulk upload 
at the end.

>
>   val spark = SparkSession.builder()
>                 .appName("MergeEntities")
>                 .config("spark.sql.warehouse.dir", 
> mergeConfig.getString("sparkSqlWarehouseDir"))
>                 .config("fs.s3a.buffer.dir", "/tmp")
>                 .config("spark.hadoop.mapred.output.committer.class", 
> classOf[DirectOutputCommitter].getCanonicalName)
>                 .config("mapred.output.committer.class", 
> classOf[DirectOutputCommitter].getCanonicalName)
>                 .config("mapreduce.use.directfileoutputcommitter", "true")
>                 //.config("spark.sql.sources.outputCommitterClass", 
> classOf[DirectOutputCommitter].getCanonicalName)
>                 .getOrCreate()
>
> Srikanth



Reply via email to