Chris, To use s3distcp in this case, are you suggesting saving the RDD to local/ephemeral HDFS and then copying it up to S3 using this tool?
On Sat, May 3, 2014 at 7:14 PM, Chris Fregly <ch...@fregly.com> wrote: > not sure if this directly addresses your issue, peter, but it's worth > mentioned a handy AWS EMR utility called s3distcp that can upload a single > HDFS file - in parallel - to a single, concatenated S3 file once all the > partitions are uploaded. kinda cool. > > here's some info: > > > http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html > > > s3distcp is an extension of the familiar hadoop distcp, of course. > > > On Thu, May 1, 2014 at 11:41 AM, Nicholas Chammas < > nicholas.cham...@gmail.com> wrote: > >> The fastest way to save to S3 should be to leave the RDD with many >> partitions, because all partitions will be written out in parallel. >> >> Then, once the various parts are in S3, somehow concatenate the files >> together into one file. >> >> If this can be done within S3 (I don't know if this is possible), then >> you get the best of both worlds: a highly parallelized write to S3, and a >> single cleanly named output file. >> >> >> On Thu, May 1, 2014 at 12:52 PM, Peter <thenephili...@yahoo.com> wrote: >> >>> Thank you Patrick. >>> >>> I took a quick stab at it: >>> >>> val s3Client = new AmazonS3Client(...) >>> val copyObjectResult = s3Client.copyObject("upload", outputPrefix + >>> "/part-00000", "rolled-up-logs", "2014-04-28.csv") >>> val objectListing = s3Client.listObjects("upload", outputPrefix) >>> s3Client.deleteObjects(new >>> DeleteObjectsRequest("upload").withKeys(objectListing.getObjectSummaries.asScala.map(s >>> => new KeyVersion(s.getKey)).asJava)) >>> >>> Using a 3GB object I achieved about 33MB/s between buckets in the same >>> AZ. >>> >>> This is a workable solution for the short term but not ideal for the >>> longer term as data size increases. I understand it's a limitation of the >>> Hadoop API but ultimately it must be possible to dump a RDD to a single S3 >>> object :) >>> >>> On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell < >>> pwend...@gmail.com> wrote: >>> This is a consequence of the way the Hadoop files API works. However, >>> you can (fairly easily) add code to just rename the file because it >>> will always produce the same filename. >>> >>> (heavy use of pseudo code) >>> >>> dir = "/some/dir" >>> rdd.coalesce(1).saveAsTextFile(dir) >>> f = new File(dir + "part-00000") >>> f.moveTo("somewhere else") >>> dir.remove() >>> >>> It might be cool to add a utility called `saveAsSingleFile` or >>> something that does this for you. In fact probably we should have >>> called saveAsTextfile "saveAsTextFiles" to make it more clear... >>> >>> On Wed, Apr 30, 2014 at 2:00 PM, Peter <thenephili...@yahoo.com> wrote: >>> > Thanks Nicholas, this is a bit of a shame, not very practical for log >>> roll >>> > up for example when every output needs to be in it's own "directory". >>> > On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas >>> > <nicholas.cham...@gmail.com> wrote: >>> > Yes, saveAsTextFile() will give you 1 part per RDD partition. When you >>> > coalesce(1), you move everything in the RDD to a single partition, >>> which >>> > then gives you 1 output file. >>> > It will still be called part-00000 or something like that because >>> that's >>> > defined by the Hadoop API that Spark uses for reading to/writing from >>> S3. I >>> > don't know of a way to change that. >>> > >>> > >>> > On Wed, Apr 30, 2014 at 2:47 PM, Peter <thenephili...@yahoo.com> >>> wrote: >>> > >>> > Ah, looks like RDD.coalesce(1) solves one part of the problem. >>> > On Wednesday, April 30, 2014 11:15 AM, Peter <thenephili...@yahoo.com> >>> > wrote: >>> > Hi >>> > >>> > Playing around with Spark & S3, I'm opening multiple objects (CSV >>> files) >>> > with: >>> > >>> > val hfile = sc.textFile("s3n://bucket/2014-04-28/") >>> > >>> > so hfile is a RDD representing 10 objects that were "underneath" >>> 2014-04-28. >>> > After I've sorted and otherwise transformed the content, I'm trying to >>> write >>> > it back to a single object: >>> > >>> > >>> > >>> sortedMap.values.map(_.mkString(",")).saveAsTextFile("s3n://bucket/concatted.csv") >>> > >>> > unfortunately this results in a "folder" named concatted.csv with 10 >>> objects >>> > underneath, part-00000 .. part-00010, corresponding to the 10 original >>> > objects loaded. >>> > >>> > How can I achieve the desired behaviour of putting a single object >>> named >>> > concatted.csv ? >>> > >>> > I've tried 0.9.1 and 1.0.0-RC3. >>> > >>> > Thanks! >>> > Peter >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> >>> >>> >> >