Re: Which OutputCommitter to use for S3?

2015-02-26 Thread Thomas Demoor
FYI. We're currently addressing this at the Hadoop level in
https://issues.apache.org/jira/browse/HADOOP-9565


Thomas Demoor

On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath  wrote:

> Just to close the loop in case anyone runs into the same problem I had.
>
> By setting --hadoop-major-version=2 when using the ec2 scripts, everything
> worked fine.
>
> Darin.
>
>
> - Original Message -
> From: Darin McBeath 
> To: Mingyu Kim ; Aaron Davidson 
> Cc: "user@spark.apache.org" 
> Sent: Monday, February 23, 2015 3:16 PM
> Subject: Re: Which OutputCommitter to use for S3?
>
> Thanks.  I think my problem might actually be the other way around.
>
> I'm compiling with hadoop 2,  but when I startup Spark, using the ec2
> scripts, I don't specify a
> -hadoop-major-version and the default is 1.   I'm guessing that if I make
> that a 2 that it might work correctly.  I'll try it and post a response.
>
>
> - Original Message -
> From: Mingyu Kim 
> To: Darin McBeath ; Aaron Davidson <
> ilike...@gmail.com>
> Cc: "user@spark.apache.org" 
> Sent: Monday, February 23, 2015 3:06 PM
> Subject: Re: Which OutputCommitter to use for S3?
>
> Cool, we will start from there. Thanks Aaron and Josh!
>
> Darin, it¹s likely because the DirectOutputCommitter is compiled with
> Hadoop 1 classes and you¹re running it with Hadoop 2.
> org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it
> became an interface in Hadoop 2.
>
> Mingyu
>
>
>
>
>
> On 2/23/15, 11:52 AM, "Darin McBeath"  wrote:
>
> >Aaron.  Thanks for the class. Since I'm currently writing Java based
> >Spark applications, I tried converting your class to Java (it seemed
> >pretty straightforward).
> >
> >I set up the use of the class as follows:
> >
> >SparkConf conf = new SparkConf()
> >.set("spark.hadoop.mapred.output.committer.class",
> >"com.elsevier.common.DirectOutputCommitter");
> >
> >And I then try and save a file to S3 (which I believe should use the old
> >hadoop apis).
> >
> >JavaPairRDD newBaselineRDDWritable =
> >reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
> >newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
> >Text.class, Text.class, SequenceFileOutputFormat.class,
> >org.apache.hadoop.io.compress.GzipCodec.class);
> >
> >But, I get the following error message.
> >
> >Exception in thread "main" java.lang.IncompatibleClassChangeError: Found
> >class org.apache.hadoop.mapred.JobContext, but interface was expected
> >at
> >com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
> >java:68)
> >at
> >org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
> >at
> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
> >.scala:1075)
> >at
> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
> >ala:940)
> >at
> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
> >ala:902)
> >at
> >org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
> >71)
> >at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)
> >
> >In my class, JobContext is an interface of  type
> >org.apache.hadoop.mapred.JobContext.
> >
> >Is there something obvious that I might be doing wrong (or messed up in
> >the translation from Scala to Java) or something I should look into?  I'm
> >using Spark 1.2 with hadoop 2.4.
> >
> >
> >Thanks.
> >
> >Darin.
> >
> >
> >
> >
> >
> >From: Aaron Davidson 
> >To: Andrew Ash 
> >Cc: Josh Rosen ; Mingyu Kim ;
> >"user@spark.apache.org" ; Aaron Davidson
> >
> >Sent: Saturday, February 21, 2015 7:01 PM
> >Subject: Re: Which OutputCommitter to use for S3?
> >
> >
> >
> >Here is the class:
> >
> https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron
> >dav_c513916e72101bbe14ec&d=AwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o
> >Onmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=_2YAVrYZtQmuKZRf6sFs
> >zOvl_-ZnxmkBPHo1K24TfGE&s=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8&e=
> >
> >You can use it by setting "mapred.output.committer.class" in the Hadoop
> >configuration (or "spark.hadoop.mapred.output.committer.class" in the
> >Spark configuration). Note that this only 

Re: performance of saveAsTextFile moving files from _temporary

2015-01-28 Thread Thomas Demoor
TLDR Extend FileOutPutCommitter to eliminate the temporary_storage. There
are some implementations to be found online, typically called
DirectOutputCommitter, f.i. this spark pull request
<https://github.com/themodernlife/spark/commit/4359664b1d557d55b0579023df809542386d5b8c>.
Tell Spark to use your shiny new committer when writing to an object store
and all will be well.

Aaron is on the right track but this renaming is bottlenecked by the object
storage system itself, irrespective of being executed in the driver or the
executor. Object stores (s3, google, azure, amplidata :P...) do not have a
native rename (it is implemented as a server side copy operation, thus its
duration is proportional to the object size). By default, Hadoop (and thus
also Spark) uses rename from temporary to final output path to enable both
retrying and speculative execution  because HDFS is a single writer system,
so multiple "job attempts" cannot write to the final output concurrently.
Object stores do allow multiple concurrent-writes to the same output, which
is exactly what makes a native rename nigh impossible. The solution is to
enable this concurrent writing instead of renaming to final output by using
a custom OutputCommitter which does not use a temporary location.

Thomas Demoor
skype: demoor.thomas
mobile: +32 497883833

On Wed, Jan 28, 2015 at 3:54 AM, Josh Walton  wrote:

> I'm not sure how to confirm how the moving is happening, however, one of
> the jobs just completed that I was talking about with 9k files of 4mb each.
> Spark UI showed the job being complete after ~2 hours. The last four hours
> of the job was just moving the files from _temporary to their final
> destination. The tasks for the write were definitely shown as complete, no
> logging is happening on the master or workers. The last line of my java
> code logs, but the job sits there as the moving of files happens.
>
> On Tue, Jan 27, 2015 at 7:24 PM, Aaron Davidson 
> wrote:
>
>> This renaming from _temporary to the final location is actually done by
>> executors, in parallel, for saveAsTextFile. It should be performed by each
>> task individually before it returns.
>>
>> I have seen an issue similar to what you mention dealing with Hive code
>> which did the renaming serially on the driver, which is very slow for S3
>> (and possibly Google Storage as well), as it actually copies the data
>> rather than doing a metadata-only operation during rename. However, this
>> should not be an issue in this case.
>>
>> Could you confirm how the moving is happening -- i.e., on the executors
>> or the driver?
>>
>> On Tue, Jan 27, 2015 at 4:31 PM, jwalton  wrote:
>>
>>> We are running spark in Google Compute Engine using their One-Click
>>> Deploy.
>>> By doing so, we get their Google Cloud Storage connector for hadoop for
>>> free
>>> meaning we can specify gs:// paths for input and output.
>>>
>>> We have jobs that take a couple of hours, end up with ~9k partitions
>>> which
>>> means 9k output files. After the job is "complete" it then moves the
>>> output
>>> files from our $output_path/_temporary to $output_path. That process can
>>> take longer than the job itself depending on the circumstances. The job I
>>> mentioned previously outputs ~4mb files, and so far has copied 1/3 of the
>>> files in 1.5 hours from _temporary to the final destination.
>>>
>>> Is there a solution to this besides reducing the number of partitions?
>>> Anyone else run into similar issues elsewhere? I don't remember this
>>> being
>>> an issue with Map Reduce jobs and hadoop, however, I probably wasn't
>>> tracking the transfer of the output files like I am with Spark.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/performance-of-saveAsTextFile-moving-files-from-temporary-tp21397.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: SaveAsTextFile to S3 bucket

2015-01-27 Thread Thomas Demoor
S3 does not have the concept "directory". An S3 bucket only holds files
(objects). The hadoop filesystem is mapped onto a bucket and use
Hadoop-specific (or rather "s3tool"-specific: s3n uses the jets3t tool)
conventions(hacks) to fake directories such as a ending with a slash
("filename/") and with s3n by "filename_$folder$" (these are leaky
abstractions, google that if you ever have some spare time :p). S3 simply
doesn't (and shouldn't) know about these conventions. Again, a bucket just
holds a shitload of files. This might seem inconvenient but directories are
really bad idea for scalable storage. However, setting "folder-like"
permissions can be done through IAM:
http://docs.aws.amazon.com/AmazonS3/latest/dev/example-policies-s3.html#iam-policy-ex1

Summarizing: by setting permissions on /dev you set permissions on that
object. It has no effect on the file "/dev/output" which is, as far as S3
cares, another object that happens to share part of the objectname with
/dev.

Thomas Demoor
skype: demoor.thomas
mobile: +32 497883833

On Tue, Jan 27, 2015 at 6:33 AM, Chen, Kevin  wrote:

>  When spark saves rdd to a text file, the directory must not exist
> upfront. It will create a directory and write the data to part- under
> that directory. In my use case, I create a directory dev in the bucket ://
> nexgen-software/dev . I expect it creates output direct under dev and a
> part- under output. But it gave me exception as I only give write
> permission to dev not the bucket. If I open up write permission to bucket,
> it worked. But it did not create output directory under dev, it rather
> creates another dev/output directory under bucket. I just want to know if
> it is possible to have output directory created under dev directory I
> created upfront.
>
>   From: Nick Pentreath 
> Date: Monday, January 26, 2015 9:15 PM
> To: "user@spark.apache.org" 
> Subject: Re: SaveAsTextFile to S3 bucket
>
>   Your output folder specifies
>
>  rdd.saveAsTextFile("s3n://nexgen-software/dev/output");
>
>  So it will try to write to /dev/output which is as expected. If you
> create the directory /dev/output upfront in your bucket, and try to save it
> to that (empty) directory, what is the behaviour?
>
> On Tue, Jan 27, 2015 at 6:21 AM, Chen, Kevin 
> wrote:
>
>>  Does anyone know if I can save a RDD as a text file to a pre-created
>> directory in S3 bucket?
>>
>>  I have a directory created in S3 bucket: //nexgen-software/dev
>>
>>  When I tried to save a RDD as text file in this directory:
>> rdd.saveAsTextFile("s3n://nexgen-software/dev/output");
>>
>>
>>  I got following exception at runtime:
>>
>> Exception in thread "main" org.apache.hadoop.fs.s3.S3Exception:
>> org.jets3t.service.S3ServiceException: S3 HEAD request failed for '/dev' -
>> ResponseCode=403, ResponseMessage=Forbidden
>>
>>
>>  I have verified /dev has write permission. However, if I grant the
>> bucket //nexgen-software write permission, I don't get exception. But the
>> output is not created under dev. Rather, a different /dev/output directory
>> is created directory in the bucket (//nexgen-software). Is this how
>> saveAsTextFile behalves in S3? Is there anyway I can have output created
>> under a pre-defied directory.
>>
>>
>>  Thanks in advance.
>>
>>
>>
>>
>>
>


Re: Spark and S3 server side encryption

2015-01-27 Thread Thomas Demoor
Spark uses the Hadoop filesystems.

I assume you are trying to use s3n:// which, under the hood, uses the 3rd
party jets3t library. It is configured through the jets3t.properties file
(google "hadoop s3n jets3t") which you should put on Spark's classpath. The
setting you are looking for is s3service.server-side-encryption

The last version of hadoop (2.6) introduces a new and improved s3a://
filesystem which has the official sdk from Amazon under the hood.


On Mon, Jan 26, 2015 at 10:01 PM, curtkohler  wrote:

> We are trying to create a Spark job that writes out a file to S3 that
> leverage S3's server side encryption for sensitive data. Typically this is
> accomplished by setting the appropriate header on the put request, but it
> isn't clear whether this capability is exposed in the Spark/Hadoop APIs.
> Does anyone have any suggestions?
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-S3-server-side-encryption-tp21377.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>