On 23 Apr 2017, at 19:49, Richard Hanson 
<rhan...@mailbox.org<mailto:rhan...@mailbox.org>> wrote:


I have a streaming job which writes data to S3. I know there are saveAsXXXX 
functions helping write data to S3. But it bundles all elements then writes out 
to S3.

use Hadoop 2.8.x binaries and the fast output stream; this will stream up data 
in blocks of 5+MB (configurable), so eleminating/reducing the upload delay in 
the close(), and saving on disk space.

however, your new object isn't going to be visible until that close() call, and 
with the FS being eventually consistent, the list operation often visibly lags 
the actual object creation (or deletions, for that matter)



So my first question - Is there any way to let saveAsXXXX functions write data 
in batch or single elements instead of whole bundle?

Right now I use S3 TransferManager to upload files in batch. The code looks 
like below (sorry I don't have code at hand)

...

val manager = // initialize TransferManager...

stream.foreachRDD { rdd =>

  val elements = rdd.collect

  manager.upload...(elemnts)

}

...


I suppose there would have problem here because TransferManager instance is at 
driver program (Now the job is working that may be because I run spark as a 
single process). And checking on the internet, seemingly it is recommended to 
use foreachPartition instead, and prevent using function that cause actions 
such as rdd.collect. So another questions: what is the best practice regarding 
to this scenario (batch upload transformed data to external storage such as 
S3)? And what functions would cause 'action' to be triggered (like data to be 
sent back to driver program)?


once you've moved to the Hadoop 2.8 s3a client, you can just use save(path) on 
the dataframe to have it all done. S3A also manages sharing the transfer 
manager across all the workers in a process...it's tricker than you think as 
you want to share the available upload bandwidth while giving some B/W to all 
threads generating output...more than one thread pool is used to handle this 
(see HADOOP-13286 for an example).

getting those Hadoop 2.8.x binaries in is a bit tricky, because of transitive 
classpath pain; the SPARK-7481 patch shows how I do it

Reply via email to