Re: Spark with S3 DirectOutputCommitter

2016-09-13 Thread Steve Loughran

On 12 Sep 2016, at 19:58, Srikanth 
> wrote:

Thanks Steve!

We are already using HDFS as an intermediate store. This is for the last stage 
of processing which has to put data in S3.
The output is partitioned by 3 fields, like 
.../field1=111/field2=999/date=-MM-DD/*
Given that there are 100s for folders and 1000s of subfolder and part files, 
rename from _temporary is just not practical in S3.
I guess we have to add another stage with S3Distcp??

Afraid so

Srikanth

On Sun, Sep 11, 2016 at 2:34 PM, Steve Loughran 
> wrote:

> On 9 Sep 2016, at 21:54, Srikanth 
> > wrote:
>
> Hello,
>
> I'm trying to use DirectOutputCommitter for s3a in Spark 2.0. I've tried a 
> few configs and none of them seem to work.
> Output always creates _temporary directory. Rename is killing performance.

> I read some notes about DirectOutputcommitter causing problems with 
> speculation turned on. Was this option removed entirely?

Spark turns off any committer with the word "direct' in its name if 
speculation==true . Concurrency, see.

even on on-speculative execution, the trouble with the direct options is that 
executor/job failures can leave incomplete/inconsistent work around —and the 
things downstream wouldn't even notice

There's work underway to address things, work which requires a consistent 
metadata store alongside S3 ( HADOOP-13345 : S3Guard).

For now: stay with the file output committer

hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true

Even better: use HDFS as the intermediate store for work, only do a bulk upload 
at the end.

>
>   val spark = SparkSession.builder()
> .appName("MergeEntities")
> .config("spark.sql.warehouse.dir", 
> mergeConfig.getString("sparkSqlWarehouseDir"))
> .config("fs.s3a.buffer.dir", "/tmp")
> .config("spark.hadoop.mapred.output.committer.class", 
> classOf[DirectOutputCommitter].getCanonicalName)
> .config("mapred.output.committer.class", 
> classOf[DirectOutputCommitter].getCanonicalName)
> .config("mapreduce.use.directfileoutputcommitter", "true")
> //.config("spark.sql.sources.outputCommitterClass", 
> classOf[DirectOutputCommitter].getCanonicalName)
> .getOrCreate()
>
> Srikanth





Re: Spark with S3 DirectOutputCommitter

2016-09-12 Thread Srikanth
Thanks Steve!

We are already using HDFS as an intermediate store. This is for the last
stage of processing which has to put data in S3.
The output is partitioned by 3 fields, like
.../field1=111/field2=999/date=-MM-DD/*
Given that there are 100s for folders and 1000s of subfolder and part
files, rename from _temporary is just not practical in S3.
I guess we have to add another stage with S3Distcp??

Srikanth

On Sun, Sep 11, 2016 at 2:34 PM, Steve Loughran 
wrote:

>
> > On 9 Sep 2016, at 21:54, Srikanth  wrote:
> >
> > Hello,
> >
> > I'm trying to use DirectOutputCommitter for s3a in Spark 2.0. I've tried
> a few configs and none of them seem to work.
> > Output always creates _temporary directory. Rename is killing
> performance.
>
> > I read some notes about DirectOutputcommitter causing problems with
> speculation turned on. Was this option removed entirely?
>
> Spark turns off any committer with the word "direct' in its name if
> speculation==true . Concurrency, see.
>
> even on on-speculative execution, the trouble with the direct options is
> that executor/job failures can leave incomplete/inconsistent work around
> —and the things downstream wouldn't even notice
>
> There's work underway to address things, work which requires a consistent
> metadata store alongside S3 ( HADOOP-13345 : S3Guard).
>
> For now: stay with the file output committer
>
> hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
> hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true
>
> Even better: use HDFS as the intermediate store for work, only do a bulk
> upload at the end.
>
> >
> >   val spark = SparkSession.builder()
> > .appName("MergeEntities")
> > .config("spark.sql.warehouse.dir",
> mergeConfig.getString("sparkSqlWarehouseDir"))
> > .config("fs.s3a.buffer.dir", "/tmp")
> > .config("spark.hadoop.mapred.output.committer.class",
> classOf[DirectOutputCommitter].getCanonicalName)
> > .config("mapred.output.committer.class",
> classOf[DirectOutputCommitter].getCanonicalName)
> > .config("mapreduce.use.directfileoutputcommitter",
> "true")
> > //.config("spark.sql.sources.outputCommitterClass",
> classOf[DirectOutputCommitter].getCanonicalName)
> > .getOrCreate()
> >
> > Srikanth
>
>


Re: Spark with S3 DirectOutputCommitter

2016-09-11 Thread Steve Loughran

> On 9 Sep 2016, at 21:54, Srikanth  wrote:
> 
> Hello,
> 
> I'm trying to use DirectOutputCommitter for s3a in Spark 2.0. I've tried a 
> few configs and none of them seem to work.
> Output always creates _temporary directory. Rename is killing performance.

> I read some notes about DirectOutputcommitter causing problems with 
> speculation turned on. Was this option removed entirely? 

Spark turns off any committer with the word "direct' in its name if 
speculation==true . Concurrency, see. 

even on on-speculative execution, the trouble with the direct options is that 
executor/job failures can leave incomplete/inconsistent work around —and the 
things downstream wouldn't even notice

There's work underway to address things, work which requires a consistent 
metadata store alongside S3 ( HADOOP-13345 : S3Guard).

For now: stay with the file output committer

hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true

Even better: use HDFS as the intermediate store for work, only do a bulk upload 
at the end.

> 
>   val spark = SparkSession.builder()
> .appName("MergeEntities")
> .config("spark.sql.warehouse.dir", 
> mergeConfig.getString("sparkSqlWarehouseDir"))
> .config("fs.s3a.buffer.dir", "/tmp")
> .config("spark.hadoop.mapred.output.committer.class", 
> classOf[DirectOutputCommitter].getCanonicalName)
> .config("mapred.output.committer.class", 
> classOf[DirectOutputCommitter].getCanonicalName)
> .config("mapreduce.use.directfileoutputcommitter", "true")
> //.config("spark.sql.sources.outputCommitterClass", 
> classOf[DirectOutputCommitter].getCanonicalName)
> .getOrCreate()
> 
> Srikanth



Spark with S3 DirectOutputCommitter

2016-09-09 Thread Srikanth
Hello,

I'm trying to use DirectOutputCommitter for s3a in Spark 2.0. I've tried a
few configs and none of them seem to work.
Output always creates _temporary directory. Rename is killing performance.
I read some notes about DirectOutputcommitter causing problems with
speculation turned on. Was this option removed entirely?

  val spark = SparkSession.builder()
.appName("MergeEntities")
.config("spark.sql.warehouse.dir", mergeConfig.getString("
sparkSqlWarehouseDir"))
.config("fs.s3a.buffer.dir", "/tmp")
.config("spark.hadoop.mapred.output.committer.class",
classOf[DirectOutputCommitter].getCanonicalName)
.config("mapred.output.committer.class",
classOf[DirectOutputCommitter].getCanonicalName)
.config("mapreduce.use.directfileoutputcommitter", "true")
//.config("spark.sql.sources.outputCommitterClass",
classOf[DirectOutputCommitter].getCanonicalName)
.getOrCreate()

Srikanth