Re: Reading multiple S3 objects, transforming, writing back one

2014-05-04 Thread Peter
Hi Patrick

I should probably explain my use case in a bit more detail. I have hundreds of 
thousands to millions of clients uploading events to my pipeline, these are 
batched periodically (every 60 seconds atm) into logs which are dumped into S3 
(and uploaded into a data warehouse). I need to post process these at another 
set interval (say hourly), mainly dedup & global sort and then roll them up 
into a single file. I will repeat this process daily (need to experiment with 
the granularity here but daily feels appropriate) to yield a single file for 
the day's data. 
I'm hoping to use Spark for these roll ups, it seems so much easier than using 
Hadoop.

The roll up is to avoid the "hadoop small files" problem, here's one article 
discussing it: The Small Files Problem
 
 The Small Files Problem
Small files are a big problem in Hadoop — or, at least, they are if the number 
of questions on the user list on this topic is anything to go by.   
View on blog.cloudera.com Preview by Yahoo  

Inspecting my dataset should be much more efficient and manageable with larger 
100s or maybe low 1000s of partitions rather than 1,000,000s. 

Hope that makes some sense :)

Thanks
Peter
On Saturday, May 3, 2014 5:12 PM, Patrick Wendell  wrote:
 
Hi Peter,

If your dataset is large (3GB) then why not just chunk it into
multiple files? You'll need to do this anyways if you want to
read/write this from S3 quickly, because S3's throughput per file is
limited (as you've seen).

This is exactly why the Hadoop API lets you save datasets with many
partitions, since often there are bottlenecks at the granularity of a
file.

Is there a reason you need this to be exactly one file?

- Patrick


On Sat, May 3, 2014 at 4:14 PM, Chris Fregly  wrote:
> not sure if this directly addresses your issue, peter, but it's worth
> mentioned a handy AWS EMR utility called s3distcp that can upload a single
> HDFS file - in parallel - to a single, concatenated S3 file once all the
> partitions are uploaded.  kinda cool.
>
> here's some info:
>
> http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html
>
> s3distcp is an extension of the familiar hadoop distcp, of course.
>
>
> On Thu, May 1, 2014 at 11:41 AM, Nicholas Chammas
>  wrote:
>>
>> The fastest way to save to S3 should be to leave the RDD with many
>> partitions, because all partitions will be written out in parallel.
>>
>> Then, once the various parts are in S3, somehow concatenate the files
>> together into one file.
>>
>> If this can be done within S3 (I don't know if this is possible), then you
>> get the best of both worlds: a highly parallelized write to S3, and a single
>> cleanly named output file.
>>
>>
>> On Thu, May 1, 2014 at 12:52 PM, Peter  wrote:
>>>
>>> Thank you Patrick.
>>>
>>> I took a quick stab at it:
>>>
>>>     val s3Client = new AmazonS3Client(...)
>>>     val copyObjectResult = s3Client.copyObject("upload", outputPrefix +
>>> "/part-0", "rolled-up-logs", "2014-04-28.csv")
>>>     val objectListing = s3Client.listObjects("upload", outputPrefix)
>>>     s3Client.deleteObjects(new
>>> DeleteObjectsRequest("upload").withKeys(objectListing.getObjectSummaries.asScala.map(s
>>> => new KeyVersion(s.getKey)).asJava))
>>>
>>> Using a 3GB object I achieved about 33MB/s between buckets in the same
>>> AZ.
>>>
>>> This is a workable solution for the short term but not ideal for the
>>> longer term as data size increases. I understand it's a limitation of the
>>> Hadoop API but ultimately it must be possible to dump a RDD to a single S3
>>> object :)
>>>
>>> On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell
>>>  wrote:
>>> This is a consequence of the way the Hadoop files API works. However,
>>> you can (fairly easily) add code to just rename the file because it
>>> will always produce the same filename.
>>>
>>> (heavy use of pseudo code)
>>>
>>> dir = "/some/dir"
>>> rdd.coalesce(1).saveAsTextFile(dir)
>>> f = new File(dir + "part-0")
>>> f.moveTo("somewhere else")
>>> dir.remove()
>>>
>>> It might be cool to add a utility called `saveAsSingleFile` or
>>> something that does this for you. In fact probably we should have
>>> called saveAsTextfile "saveAsTextFiles" to make it more clear...
>>>
>>> On Wed, Apr 30, 2014 at 2:00 PM, Peter  wrote:
>>> > Thanks Nicholas, this is a bit of a shame, not very practical for log
>>> > roll
>>> > up for example when every output needs to be in it's own "directory".
>>> > On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
>>> >  wrote:
>>> > Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
>>> > coalesce(1), you move everything in the RDD to a single partition,
>>> > which
>>> > then gives you 1 output file.
>>> > It will still be called part-0 or something like that because
>>> > that's
>>> > defined by the Hadoop API that Spark uses for reading to/writing from
>>> > S3. I
>>> > don't know of a way to change that.
>>> >
>>> >
>>> > On Wed, Apr 30, 2014 at 2:47 PM

Re: Reading multiple S3 objects, transforming, writing back one

2014-05-04 Thread Peter
Thank you Chris, I am familiar with S3distcp, I'm trying to replicate some of 
that functionality and combine it with my log post processing in one step 
instead of yet another step. 
On Saturday, May 3, 2014 4:15 PM, Chris Fregly  wrote:
 
not sure if this directly addresses your issue, peter, but it's worth mentioned 
a handy AWS EMR utility called s3distcp that can upload a single HDFS file - in 
parallel - to a single, concatenated S3 file once all the partitions are 
uploaded.  kinda cool.

here's some info:

http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html
 

s3distcp is an extension of the familiar hadoop distcp, of course.



On Thu, May 1, 2014 at 11:41 AM, Nicholas Chammas  
wrote:

The fastest way to save to S3 should be to leave the RDD with many partitions, 
because all partitions will be written out in parallel. 
>
>
>Then, once the various parts are in S3, somehow concatenate the files together 
>into one file. 
>
>
>If this can be done within S3 (I don't know if this is possible), then you get 
>the best of both worlds: a highly parallelized write to S3, and a single 
>cleanly named output file.
>
>
>
>On Thu, May 1, 2014 at 12:52 PM, Peter  wrote:
>
>Thank you Patrick. 
>>
>>
>>I took a quick stab at it:
>>
>>
>>    val s3Client = new AmazonS3Client(...)
>>    val copyObjectResult = s3Client.copyObject("upload", outputPrefix + 
>>"/part-0", "rolled-up-logs", "2014-04-28.csv")
>>    val objectListing = s3Client.listObjects("upload", outputPrefix)
>>    s3Client.deleteObjects(new 
>>DeleteObjectsRequest("upload").withKeys(objectListing.getObjectSummaries.asScala.map(s
>> => new KeyVersion(s.getKey)).asJava))
>>
>>
>>Using a 3GB object I achieved about 33MB/s between buckets in the same AZ. 
>>
>>
>>This is a workable solution for the short term but not ideal for the longer 
>>term as data size increases. I understand it's a limitation of the Hadoop API 
>>but ultimately it must be possible to dump a RDD to a single S3 object :) 
>>
>>
>>On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell  
>>wrote:
>> 
>>This is a consequence of the way the Hadoop files API works. However,
>>you can (fairly easily) add code to just rename the file because it
>>will always produce the same filename.
>>
>>(heavy use of pseudo code)
>>
>>dir = "/some/dir"
>>rdd.coalesce(1).saveAsTextFile(dir)
>>f = new File(dir + "part-0")
>>f.moveTo("somewhere else")
>>dir.remove()
>>
>>It might be cool to add a utility called `saveAsSingleFile` or
>>something that does this for you. In fact probably we should have
>>called saveAsTextfile "saveAsTextFiles" to make it more clear...
>>
>>
>>On Wed, Apr 30, 2014 at 2:00 PM, Peter  wrote:
>>> Thanks Nicholas, this is a bit of a shame, not very practical for log
 roll
>>> up for example when every output needs to be in it's own "directory".
>>> On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
>>>  wrote:
>>> Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
>>> coalesce(1), you move everything in the RDD to a single partition, which
>>> then gives you 1 output file.
>>> It will still be called part-0 or something like that because that's
>>> defined by the Hadoop API that Spark uses for reading to/writing from S3. I
>>> don't know of a way to change that.
>>>
>>>
>>> On Wed, Apr 30, 2014 at 2:47 PM, Peter  wrote:
>>>
>>> Ah, looks like RDD.coalesce(1) solves one part of the problem.
>>> On Wednesday, April 30, 2014 11:15 AM, Peter 
>>> wrote:
>>> Hi
>>>
>>> Playing around with Spark & S3, I'm opening multiple objects
 (CSV files)
>>> with:
>>>
>>>     val hfile = sc.textFile("s3n://bucket/2014-04-28/")
>>>
>>> so hfile is a RDD representing 10 objects that were "underneath" 2014-04-28.
>>> After I've sorted and otherwise transformed the content, I'm trying to write
>>> it back to a single object:
>>>
>>>
>>> sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")
>>>
>>> unfortunately this results in a "folder" named concatted.csv with 10 objects
>>> underneath, part-0 ..
 part-00010, corresponding to the 10 original
>>> objects loaded.
>>>
>>> How can I achieve the desired behaviour of putting a single object named
>>> concatted.csv ?
>>>
>>> I've tried 0.9.1 and 1.0.0-RC3.
>>>
>>> Thanks!
>>> Peter
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>>
>

Re: Reading multiple S3 objects, transforming, writing back one

2014-05-04 Thread Nicholas Chammas
Chris,

To use s3distcp in this case, are you suggesting saving the RDD to
local/ephemeral HDFS and then copying it up to S3 using this tool?


On Sat, May 3, 2014 at 7:14 PM, Chris Fregly  wrote:

> not sure if this directly addresses your issue, peter, but it's worth
> mentioned a handy AWS EMR utility called s3distcp that can upload a single
> HDFS file - in parallel - to a single, concatenated S3 file once all the
> partitions are uploaded.  kinda cool.
>
> here's some info:
>
>
> http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html
>
>
> s3distcp is an extension of the familiar hadoop distcp, of course.
>
>
> On Thu, May 1, 2014 at 11:41 AM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> The fastest way to save to S3 should be to leave the RDD with many
>> partitions, because all partitions will be written out in parallel.
>>
>> Then, once the various parts are in S3, somehow concatenate the files
>> together into one file.
>>
>> If this can be done within S3 (I don't know if this is possible), then
>> you get the best of both worlds: a highly parallelized write to S3, and a
>> single cleanly named output file.
>>
>>
>> On Thu, May 1, 2014 at 12:52 PM, Peter  wrote:
>>
>>> Thank you Patrick.
>>>
>>> I took a quick stab at it:
>>>
>>> val s3Client = new AmazonS3Client(...)
>>> val copyObjectResult = s3Client.copyObject("upload", outputPrefix +
>>> "/part-0", "rolled-up-logs", "2014-04-28.csv")
>>> val objectListing = s3Client.listObjects("upload", outputPrefix)
>>> s3Client.deleteObjects(new
>>> DeleteObjectsRequest("upload").withKeys(objectListing.getObjectSummaries.asScala.map(s
>>> => new KeyVersion(s.getKey)).asJava))
>>>
>>>  Using a 3GB object I achieved about 33MB/s between buckets in the same
>>> AZ.
>>>
>>> This is a workable solution for the short term but not ideal for the
>>> longer term as data size increases. I understand it's a limitation of the
>>> Hadoop API but ultimately it must be possible to dump a RDD to a single S3
>>> object :)
>>>
>>>   On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell <
>>> pwend...@gmail.com> wrote:
>>>  This is a consequence of the way the Hadoop files API works. However,
>>> you can (fairly easily) add code to just rename the file because it
>>> will always produce the same filename.
>>>
>>> (heavy use of pseudo code)
>>>
>>> dir = "/some/dir"
>>> rdd.coalesce(1).saveAsTextFile(dir)
>>> f = new File(dir + "part-0")
>>> f.moveTo("somewhere else")
>>> dir.remove()
>>>
>>> It might be cool to add a utility called `saveAsSingleFile` or
>>> something that does this for you. In fact probably we should have
>>> called saveAsTextfile "saveAsTextFiles" to make it more clear...
>>>
>>> On Wed, Apr 30, 2014 at 2:00 PM, Peter  wrote:
>>> > Thanks Nicholas, this is a bit of a shame, not very practical for log
>>> roll
>>> > up for example when every output needs to be in it's own "directory".
>>> > On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
>>> >  wrote:
>>> > Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
>>> > coalesce(1), you move everything in the RDD to a single partition,
>>> which
>>> > then gives you 1 output file.
>>> > It will still be called part-0 or something like that because
>>> that's
>>> > defined by the Hadoop API that Spark uses for reading to/writing from
>>> S3. I
>>> > don't know of a way to change that.
>>> >
>>> >
>>> > On Wed, Apr 30, 2014 at 2:47 PM, Peter 
>>> wrote:
>>> >
>>> > Ah, looks like RDD.coalesce(1) solves one part of the problem.
>>> > On Wednesday, April 30, 2014 11:15 AM, Peter 
>>> > wrote:
>>> > Hi
>>> >
>>> > Playing around with Spark & S3, I'm opening multiple objects (CSV
>>> files)
>>> > with:
>>> >
>>> >val hfile = sc.textFile("s3n://bucket/2014-04-28/")
>>> >
>>> > so hfile is a RDD representing 10 objects that were "underneath"
>>> 2014-04-28.
>>> > After I've sorted and otherwise transformed the content, I'm trying to
>>> write
>>> > it back to a single object:
>>> >
>>> >
>>> >
>>> sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")
>>> >
>>> > unfortunately this results in a "folder" named concatted.csv with 10
>>> objects
>>> > underneath, part-0 .. part-00010, corresponding to the 10 original
>>> > objects loaded.
>>> >
>>> > How can I achieve the desired behaviour of putting a single object
>>> named
>>> > concatted.csv ?
>>> >
>>> > I've tried 0.9.1 and 1.0.0-RC3.
>>> >
>>> > Thanks!
>>> > Peter
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>>
>>>
>>>
>>
>


Re: Reading multiple S3 objects, transforming, writing back one

2014-05-03 Thread Patrick Wendell
Hi Peter,

If your dataset is large (3GB) then why not just chunk it into
multiple files? You'll need to do this anyways if you want to
read/write this from S3 quickly, because S3's throughput per file is
limited (as you've seen).

This is exactly why the Hadoop API lets you save datasets with many
partitions, since often there are bottlenecks at the granularity of a
file.

Is there a reason you need this to be exactly one file?

- Patrick

On Sat, May 3, 2014 at 4:14 PM, Chris Fregly  wrote:
> not sure if this directly addresses your issue, peter, but it's worth
> mentioned a handy AWS EMR utility called s3distcp that can upload a single
> HDFS file - in parallel - to a single, concatenated S3 file once all the
> partitions are uploaded.  kinda cool.
>
> here's some info:
>
> http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html
>
> s3distcp is an extension of the familiar hadoop distcp, of course.
>
>
> On Thu, May 1, 2014 at 11:41 AM, Nicholas Chammas
>  wrote:
>>
>> The fastest way to save to S3 should be to leave the RDD with many
>> partitions, because all partitions will be written out in parallel.
>>
>> Then, once the various parts are in S3, somehow concatenate the files
>> together into one file.
>>
>> If this can be done within S3 (I don't know if this is possible), then you
>> get the best of both worlds: a highly parallelized write to S3, and a single
>> cleanly named output file.
>>
>>
>> On Thu, May 1, 2014 at 12:52 PM, Peter  wrote:
>>>
>>> Thank you Patrick.
>>>
>>> I took a quick stab at it:
>>>
>>> val s3Client = new AmazonS3Client(...)
>>> val copyObjectResult = s3Client.copyObject("upload", outputPrefix +
>>> "/part-0", "rolled-up-logs", "2014-04-28.csv")
>>> val objectListing = s3Client.listObjects("upload", outputPrefix)
>>> s3Client.deleteObjects(new
>>> DeleteObjectsRequest("upload").withKeys(objectListing.getObjectSummaries.asScala.map(s
>>> => new KeyVersion(s.getKey)).asJava))
>>>
>>> Using a 3GB object I achieved about 33MB/s between buckets in the same
>>> AZ.
>>>
>>> This is a workable solution for the short term but not ideal for the
>>> longer term as data size increases. I understand it's a limitation of the
>>> Hadoop API but ultimately it must be possible to dump a RDD to a single S3
>>> object :)
>>>
>>> On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell
>>>  wrote:
>>> This is a consequence of the way the Hadoop files API works. However,
>>> you can (fairly easily) add code to just rename the file because it
>>> will always produce the same filename.
>>>
>>> (heavy use of pseudo code)
>>>
>>> dir = "/some/dir"
>>> rdd.coalesce(1).saveAsTextFile(dir)
>>> f = new File(dir + "part-0")
>>> f.moveTo("somewhere else")
>>> dir.remove()
>>>
>>> It might be cool to add a utility called `saveAsSingleFile` or
>>> something that does this for you. In fact probably we should have
>>> called saveAsTextfile "saveAsTextFiles" to make it more clear...
>>>
>>> On Wed, Apr 30, 2014 at 2:00 PM, Peter  wrote:
>>> > Thanks Nicholas, this is a bit of a shame, not very practical for log
>>> > roll
>>> > up for example when every output needs to be in it's own "directory".
>>> > On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
>>> >  wrote:
>>> > Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
>>> > coalesce(1), you move everything in the RDD to a single partition,
>>> > which
>>> > then gives you 1 output file.
>>> > It will still be called part-0 or something like that because
>>> > that's
>>> > defined by the Hadoop API that Spark uses for reading to/writing from
>>> > S3. I
>>> > don't know of a way to change that.
>>> >
>>> >
>>> > On Wed, Apr 30, 2014 at 2:47 PM, Peter  wrote:
>>> >
>>> > Ah, looks like RDD.coalesce(1) solves one part of the problem.
>>> > On Wednesday, April 30, 2014 11:15 AM, Peter 
>>> > wrote:
>>> > Hi
>>> >
>>> > Playing around with Spark & S3, I'm opening multiple objects (CSV
>>> > files)
>>> > with:
>>> >
>>> >val hfile = sc.textFile("s3n://bucket/2014-04-28/")
>>> >
>>> > so hfile is a RDD representing 10 objects that were "underneath"
>>> > 2014-04-28.
>>> > After I've sorted and otherwise transformed the content, I'm trying to
>>> > write
>>> > it back to a single object:
>>> >
>>> >
>>> >
>>> > sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")
>>> >
>>> > unfortunately this results in a "folder" named concatted.csv with 10
>>> > objects
>>> > underneath, part-0 .. part-00010, corresponding to the 10 original
>>> > objects loaded.
>>> >
>>> > How can I achieve the desired behaviour of putting a single object
>>> > named
>>> > concatted.csv ?
>>> >
>>> > I've tried 0.9.1 and 1.0.0-RC3.
>>> >
>>> > Thanks!
>>> > Peter
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>>
>>>
>>
>


Re: Reading multiple S3 objects, transforming, writing back one

2014-05-03 Thread Chris Fregly
not sure if this directly addresses your issue, peter, but it's worth
mentioned a handy AWS EMR utility called s3distcp that can upload a single
HDFS file - in parallel - to a single, concatenated S3 file once all the
partitions are uploaded.  kinda cool.

here's some info:

http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html


s3distcp is an extension of the familiar hadoop distcp, of course.


On Thu, May 1, 2014 at 11:41 AM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> The fastest way to save to S3 should be to leave the RDD with many
> partitions, because all partitions will be written out in parallel.
>
> Then, once the various parts are in S3, somehow concatenate the files
> together into one file.
>
> If this can be done within S3 (I don't know if this is possible), then you
> get the best of both worlds: a highly parallelized write to S3, and a
> single cleanly named output file.
>
>
> On Thu, May 1, 2014 at 12:52 PM, Peter  wrote:
>
>> Thank you Patrick.
>>
>> I took a quick stab at it:
>>
>> val s3Client = new AmazonS3Client(...)
>> val copyObjectResult = s3Client.copyObject("upload", outputPrefix +
>> "/part-0", "rolled-up-logs", "2014-04-28.csv")
>> val objectListing = s3Client.listObjects("upload", outputPrefix)
>> s3Client.deleteObjects(new
>> DeleteObjectsRequest("upload").withKeys(objectListing.getObjectSummaries.asScala.map(s
>> => new KeyVersion(s.getKey)).asJava))
>>
>>  Using a 3GB object I achieved about 33MB/s between buckets in the same
>> AZ.
>>
>> This is a workable solution for the short term but not ideal for the
>> longer term as data size increases. I understand it's a limitation of the
>> Hadoop API but ultimately it must be possible to dump a RDD to a single S3
>> object :)
>>
>>   On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell <
>> pwend...@gmail.com> wrote:
>>  This is a consequence of the way the Hadoop files API works. However,
>> you can (fairly easily) add code to just rename the file because it
>> will always produce the same filename.
>>
>> (heavy use of pseudo code)
>>
>> dir = "/some/dir"
>> rdd.coalesce(1).saveAsTextFile(dir)
>> f = new File(dir + "part-0")
>> f.moveTo("somewhere else")
>> dir.remove()
>>
>> It might be cool to add a utility called `saveAsSingleFile` or
>> something that does this for you. In fact probably we should have
>> called saveAsTextfile "saveAsTextFiles" to make it more clear...
>>
>> On Wed, Apr 30, 2014 at 2:00 PM, Peter  wrote:
>> > Thanks Nicholas, this is a bit of a shame, not very practical for log
>> roll
>> > up for example when every output needs to be in it's own "directory".
>> > On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
>> >  wrote:
>> > Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
>> > coalesce(1), you move everything in the RDD to a single partition, which
>> > then gives you 1 output file.
>> > It will still be called part-0 or something like that because that's
>> > defined by the Hadoop API that Spark uses for reading to/writing from
>> S3. I
>> > don't know of a way to change that.
>> >
>> >
>> > On Wed, Apr 30, 2014 at 2:47 PM, Peter  wrote:
>> >
>> > Ah, looks like RDD.coalesce(1) solves one part of the problem.
>> > On Wednesday, April 30, 2014 11:15 AM, Peter 
>> > wrote:
>> > Hi
>> >
>> > Playing around with Spark & S3, I'm opening multiple objects (CSV files)
>> > with:
>> >
>> >val hfile = sc.textFile("s3n://bucket/2014-04-28/")
>> >
>> > so hfile is a RDD representing 10 objects that were "underneath"
>> 2014-04-28.
>> > After I've sorted and otherwise transformed the content, I'm trying to
>> write
>> > it back to a single object:
>> >
>> >
>> >
>> sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")
>> >
>> > unfortunately this results in a "folder" named concatted.csv with 10
>> objects
>> > underneath, part-0 .. part-00010, corresponding to the 10 original
>> > objects loaded.
>> >
>> > How can I achieve the desired behaviour of putting a single object named
>> > concatted.csv ?
>> >
>> > I've tried 0.9.1 and 1.0.0-RC3.
>> >
>> > Thanks!
>> > Peter
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>>
>>
>>
>


Re: Reading multiple S3 objects, transforming, writing back one

2014-05-01 Thread Nicholas Chammas
The fastest way to save to S3 should be to leave the RDD with many
partitions, because all partitions will be written out in parallel.

Then, once the various parts are in S3, somehow concatenate the files
together into one file.

If this can be done within S3 (I don't know if this is possible), then you
get the best of both worlds: a highly parallelized write to S3, and a
single cleanly named output file.


On Thu, May 1, 2014 at 12:52 PM, Peter  wrote:

> Thank you Patrick.
>
> I took a quick stab at it:
>
> val s3Client = new AmazonS3Client(...)
> val copyObjectResult = s3Client.copyObject("upload", outputPrefix +
> "/part-0", "rolled-up-logs", "2014-04-28.csv")
> val objectListing = s3Client.listObjects("upload", outputPrefix)
> s3Client.deleteObjects(new
> DeleteObjectsRequest("upload").withKeys(objectListing.getObjectSummaries.asScala.map(s
> => new KeyVersion(s.getKey)).asJava))
>
> Using a 3GB object I achieved about 33MB/s between buckets in the same AZ.
>
> This is a workable solution for the short term but not ideal for the
> longer term as data size increases. I understand it's a limitation of the
> Hadoop API but ultimately it must be possible to dump a RDD to a single S3
> object :)
>
>   On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell <
> pwend...@gmail.com> wrote:
>  This is a consequence of the way the Hadoop files API works. However,
> you can (fairly easily) add code to just rename the file because it
> will always produce the same filename.
>
> (heavy use of pseudo code)
>
> dir = "/some/dir"
> rdd.coalesce(1).saveAsTextFile(dir)
> f = new File(dir + "part-0")
> f.moveTo("somewhere else")
> dir.remove()
>
> It might be cool to add a utility called `saveAsSingleFile` or
> something that does this for you. In fact probably we should have
> called saveAsTextfile "saveAsTextFiles" to make it more clear...
>
> On Wed, Apr 30, 2014 at 2:00 PM, Peter  wrote:
> > Thanks Nicholas, this is a bit of a shame, not very practical for log
> roll
> > up for example when every output needs to be in it's own "directory".
> > On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
> >  wrote:
> > Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
> > coalesce(1), you move everything in the RDD to a single partition, which
> > then gives you 1 output file.
> > It will still be called part-0 or something like that because that's
> > defined by the Hadoop API that Spark uses for reading to/writing from
> S3. I
> > don't know of a way to change that.
> >
> >
> > On Wed, Apr 30, 2014 at 2:47 PM, Peter  wrote:
> >
> > Ah, looks like RDD.coalesce(1) solves one part of the problem.
> > On Wednesday, April 30, 2014 11:15 AM, Peter 
> > wrote:
> > Hi
> >
> > Playing around with Spark & S3, I'm opening multiple objects (CSV files)
> > with:
> >
> >val hfile = sc.textFile("s3n://bucket/2014-04-28/")
> >
> > so hfile is a RDD representing 10 objects that were "underneath"
> 2014-04-28.
> > After I've sorted and otherwise transformed the content, I'm trying to
> write
> > it back to a single object:
> >
> >
> >
> sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")
> >
> > unfortunately this results in a "folder" named concatted.csv with 10
> objects
> > underneath, part-0 .. part-00010, corresponding to the 10 original
> > objects loaded.
> >
> > How can I achieve the desired behaviour of putting a single object named
> > concatted.csv ?
> >
> > I've tried 0.9.1 and 1.0.0-RC3.
> >
> > Thanks!
> > Peter
> >
> >
> >
> >
> >
> >
> >
>
>
>


Re: Reading multiple S3 objects, transforming, writing back one

2014-05-01 Thread Peter
Thank you Patrick. 

I took a quick stab at it:

    val s3Client = new AmazonS3Client(...)
    val copyObjectResult = s3Client.copyObject("upload", outputPrefix + 
"/part-0", "rolled-up-logs", "2014-04-28.csv")
    val objectListing = s3Client.listObjects("upload", outputPrefix)
    s3Client.deleteObjects(new 
DeleteObjectsRequest("upload").withKeys(objectListing.getObjectSummaries.asScala.map(s
 => new KeyVersion(s.getKey)).asJava))

Using a 3GB object I achieved about 33MB/s between buckets in the same AZ. 

This is a workable solution for the short term but not ideal for the longer 
term as data size increases. I understand it's a limitation of the Hadoop API 
but ultimately it must be possible to dump a RDD to a single S3 object :) 

On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell  
wrote:
 
This is a consequence of the way the Hadoop files API works. However,
you can (fairly easily) add code to just rename the file because it
will always produce the same filename.

(heavy use of pseudo code)

dir = "/some/dir"
rdd.coalesce(1).saveAsTextFile(dir)
f = new File(dir + "part-0")
f.moveTo("somewhere else")
dir.remove()

It might be cool to add a utility called `saveAsSingleFile` or
something that does this for you. In fact probably we should have
called saveAsTextfile "saveAsTextFiles" to make it more clear...


On Wed, Apr 30, 2014 at 2:00 PM, Peter  wrote:
> Thanks Nicholas, this is a bit of a shame, not very practical for log roll
> up for example when every output needs to be in it's own "directory".
> On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
>  wrote:
> Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
> coalesce(1), you move everything in the RDD to a single partition, which
> then gives you 1 output file.
> It will still be called part-0 or something like that because that's
> defined by the Hadoop API that Spark uses for reading to/writing from S3. I
> don't know of a way to change that.
>
>
> On Wed, Apr 30, 2014 at 2:47 PM, Peter  wrote:
>
> Ah, looks like RDD.coalesce(1) solves one part of the problem.
> On Wednesday, April 30, 2014 11:15 AM, Peter 
> wrote:
> Hi
>
> Playing around with Spark & S3, I'm opening multiple objects (CSV files)
> with:
>
>     val hfile = sc.textFile("s3n://bucket/2014-04-28/")
>
> so hfile is a RDD representing 10 objects that were "underneath" 2014-04-28.
> After I've sorted and otherwise transformed the content, I'm trying to write
> it back to a single object:
>
>
> sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")
>
> unfortunately this results in a "folder" named concatted.csv with 10 objects
> underneath, part-0 .. part-00010, corresponding to the 10 original
> objects loaded.
>
> How can I achieve the desired behaviour of putting a single object named
> concatted.csv ?
>
> I've tried 0.9.1 and 1.0.0-RC3.
>
> Thanks!
> Peter
>
>
>
>
>
>
>

Re: Reading multiple S3 objects, transforming, writing back one

2014-04-30 Thread Patrick Wendell
This is a consequence of the way the Hadoop files API works. However,
you can (fairly easily) add code to just rename the file because it
will always produce the same filename.

(heavy use of pseudo code)

dir = "/some/dir"
rdd.coalesce(1).saveAsTextFile(dir)
f = new File(dir + "part-0")
f.moveTo("somewhere else")
dir.remove()

It might be cool to add a utility called `saveAsSingleFile` or
something that does this for you. In fact probably we should have
called saveAsTextfile "saveAsTextFiles" to make it more clear...

On Wed, Apr 30, 2014 at 2:00 PM, Peter  wrote:
> Thanks Nicholas, this is a bit of a shame, not very practical for log roll
> up for example when every output needs to be in it's own "directory".
> On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
>  wrote:
> Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
> coalesce(1), you move everything in the RDD to a single partition, which
> then gives you 1 output file.
> It will still be called part-0 or something like that because that's
> defined by the Hadoop API that Spark uses for reading to/writing from S3. I
> don't know of a way to change that.
>
>
> On Wed, Apr 30, 2014 at 2:47 PM, Peter  wrote:
>
> Ah, looks like RDD.coalesce(1) solves one part of the problem.
> On Wednesday, April 30, 2014 11:15 AM, Peter 
> wrote:
> Hi
>
> Playing around with Spark & S3, I'm opening multiple objects (CSV files)
> with:
>
> val hfile = sc.textFile("s3n://bucket/2014-04-28/")
>
> so hfile is a RDD representing 10 objects that were "underneath" 2014-04-28.
> After I've sorted and otherwise transformed the content, I'm trying to write
> it back to a single object:
>
>
> sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")
>
> unfortunately this results in a "folder" named concatted.csv with 10 objects
> underneath, part-0 .. part-00010, corresponding to the 10 original
> objects loaded.
>
> How can I achieve the desired behaviour of putting a single object named
> concatted.csv ?
>
> I've tried 0.9.1 and 1.0.0-RC3.
>
> Thanks!
> Peter
>
>
>
>
>
>
>


Re: Reading multiple S3 objects, transforming, writing back one

2014-04-30 Thread Peter
Thanks Nicholas, this is a bit of a shame, not very practical for log roll up 
for example when every output needs to be in it's own "directory". 
On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas 
 wrote:
 
Yes, saveAsTextFile() will give you 1 part per RDD partition. When you 
coalesce(1), you move everything in the RDD to a single partition, which then 
gives you 1 output file. 
It will still be called part-0 or something like that because that’s 
defined by the Hadoop API that Spark uses for reading to/writing from S3. I 
don’t know of a way to change that.



On Wed, Apr 30, 2014 at 2:47 PM, Peter  wrote:

Ah, looks like RDD.coalesce(1) solves one part of the problem.
>On Wednesday, April 30, 2014 11:15 AM, Peter  wrote:
> 
>Hi
>
>
>Playing around with Spark & S3, I'm opening multiple objects (CSV files) with:
>
>
>    val hfile = sc.textFile("s3n://bucket/2014-04-28/")
>
>
>so hfile is a RDD representing 10 objects that were "underneath" 2014-04-28. 
>After I've sorted and otherwise transformed the content, I'm trying to write 
>it back to a single object:
>
>
>    
>sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")
>
>
>unfortunately this results in a "folder" named concatted.csv with 10 objects 
>underneath, part-0 .. part-00010, corresponding to the 10 original objects 
>loaded. 
>
>
>How can I achieve the desired behaviour of putting a single object named 
>concatted.csv ?
>
>
>I've tried 0.9.1 and 1.0.0-RC3. 
>
>
>Thanks!
>Peter
>
>
>
>
>
>

Re: Reading multiple S3 objects, transforming, writing back one

2014-04-30 Thread Nicholas Chammas
Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
coalesce(1), you move everything in the RDD to a single partition, which
then gives you 1 output file.

It will still be called part-0 or something like that because that’s
defined by the Hadoop API that Spark uses for reading to/writing from S3. I
don’t know of a way to change that.


On Wed, Apr 30, 2014 at 2:47 PM, Peter  wrote:

> Ah, looks like RDD.coalesce(1) solves one part of the problem.
>   On Wednesday, April 30, 2014 11:15 AM, Peter 
> wrote:
>  Hi
>
> Playing around with Spark & S3, I'm opening multiple objects (CSV files)
> with:
>
> val hfile = sc.textFile("s3n://bucket/2014-04-28/")
>
> so hfile is a RDD representing 10 objects that were "underneath"
> 2014-04-28. After I've sorted and otherwise transformed the content, I'm
> trying to write it back to a single object:
>
>
> sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")
>
> unfortunately this results in a "folder" named concatted.csv with 10
> objects underneath, part-0 .. part-00010, corresponding to the 10
> original objects loaded.
>
> How can I achieve the desired behaviour of putting a single object named
> concatted.csv ?
>
> I've tried 0.9.1 and 1.0.0-RC3.
>
> Thanks!
> Peter
>
>
>
>
>


Re: Reading multiple S3 objects, transforming, writing back one

2014-04-30 Thread Peter
Ah, looks like RDD.coalesce(1) solves one part of the problem.
On Wednesday, April 30, 2014 11:15 AM, Peter  wrote:
 
Hi

Playing around with Spark & S3, I'm opening multiple objects (CSV files) with:

    val hfile = sc.textFile("s3n://bucket/2014-04-28/")

so hfile is a RDD representing 10 objects that were "underneath" 2014-04-28. 
After I've sorted and otherwise transformed the content, I'm trying to write it 
back to a single object:

    
sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")

unfortunately this results in a "folder" named concatted.csv with 10 objects 
underneath, part-0 .. part-00010, corresponding to the 10 original objects 
loaded. 

How can I achieve the desired behaviour of putting a single object named 
concatted.csv ?

I've tried 0.9.1 and 1.0.0-RC3. 

Thanks!
Peter

Reading multiple S3 objects, transforming, writing back one

2014-04-30 Thread Peter
Hi

Playing around with Spark & S3, I'm opening multiple objects (CSV files) with:

    val hfile = sc.textFile("s3n://bucket/2014-04-28/")

so hfile is a RDD representing 10 objects that were "underneath" 2014-04-28. 
After I've sorted and otherwise transformed the content, I'm trying to write it 
back to a single object:

    
sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv")

unfortunately this results in a "folder" named concatted.csv with 10 objects 
underneath, part-0 .. part-00010, corresponding to the 10 original objects 
loaded. 

How can I achieve the desired behaviour of putting a single object named 
concatted.csv ?

I've tried 0.9.1 and 1.0.0-RC3. 

Thanks!
Peter