Thanks Steve!

We are already using HDFS as an intermediate store. This is for the last
stage of processing which has to put data in S3.
The output is partitioned by 3 fields, like
.../field1=111/field2=999/date=YYYY-MM-DD/*
Given that there are 100s for folders and 1000s of subfolder and part
files, rename from _temporary is just not practical in S3.
I guess we have to add another stage with S3Distcp??

Srikanth

On Sun, Sep 11, 2016 at 2:34 PM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> > On 9 Sep 2016, at 21:54, Srikanth <srikanth...@gmail.com> wrote:
> >
> > Hello,
> >
> > I'm trying to use DirectOutputCommitter for s3a in Spark 2.0. I've tried
> a few configs and none of them seem to work.
> > Output always creates _temporary directory. Rename is killing
> performance.
>
> > I read some notes about DirectOutputcommitter causing problems with
> speculation turned on. Was this option removed entirely?
>
> Spark turns off any committer with the word "direct' in its name if
> speculation==true . Concurrency, see.
>
> even on on-speculative execution, the trouble with the direct options is
> that executor/job failures can leave incomplete/inconsistent work around
> —and the things downstream wouldn't even notice
>
> There's work underway to address things, work which requires a consistent
> metadata store alongside S3 ( HADOOP-13345 : S3Guard).
>
> For now: stay with the file output committer
>
> hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
> hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true
>
> Even better: use HDFS as the intermediate store for work, only do a bulk
> upload at the end.
>
> >
> >   val spark = SparkSession.builder()
> >                 .appName("MergeEntities")
> >                 .config("spark.sql.warehouse.dir",
> mergeConfig.getString("sparkSqlWarehouseDir"))
> >                 .config("fs.s3a.buffer.dir", "/tmp")
> >                 .config("spark.hadoop.mapred.output.committer.class",
> classOf[DirectOutputCommitter].getCanonicalName)
> >                 .config("mapred.output.committer.class",
> classOf[DirectOutputCommitter].getCanonicalName)
> >                 .config("mapreduce.use.directfileoutputcommitter",
> "true")
> >                 //.config("spark.sql.sources.outputCommitterClass",
> classOf[DirectOutputCommitter].getCanonicalName)
> >                 .getOrCreate()
> >
> > Srikanth
>
>

Reply via email to