Hi Ewan Currently I split my dataframe into n smaller dataframes can call write.().json(³S3://³)
Each data frame becomes a single S3 object. I assume for your solution to work I would need to reparation(1) each of the smaller sets so that they are written as a single s3 object. I am also considering using a java executorService and thread pool. Its easy to do. Each thread would call df.write.json(³s3²://); One advantage of this is that I do not need to make any assumptions about how spark is implemented. I assume the thread pool is running on the driver so the slaves do not incur any extra overhead. Thanks Andy From: Ewan Leith <ewan.le...@realitymine.com> Date: Friday, July 8, 2016 at 8:52 AM To: Cody Koeninger <c...@koeninger.org>, Andrew Davidson <a...@santacruzintegration.com> Cc: "user @spark" <user@spark.apache.org> Subject: RE: is dataframe.write() async? Streaming performance problem > Writing (or reading) small files from spark to s3 can be seriously slow. > > You'll get much higher throughput by doing a df.foreachPartition(partition => > ...) and inside each partition, creating an aws s3 client then doing a > partition.foreach and uploading the files using that s3 client with its own > threadpool. > > As long as you create the s3 client inside the foreachPartition, and close it > after the partition.foreach(...) is done, you shouldn't have any issues. > > Something roughly like this from the DStream docs: > > df.foreachPartition { partitionOfRecords => > val connection = createNewConnection() > partitionOfRecords.foreach(record => connection.send(record)) > connection.close() > } > > Hope this helps, > Ewan > > -----Original Message----- > From: Cody Koeninger [mailto:c...@koeninger.org] > Sent: 08 July 2016 15:31 > To: Andy Davidson <a...@santacruzintegration.com> > Cc: user @spark <user@spark.apache.org> > Subject: Re: is dataframe.write() async? Streaming performance problem > > Maybe obvious, but what happens when you change the s3 write to a println of > all the data? That should identify whether it's the issue. > > count() and read.json() will involve additional tasks (run through the items > in the rdd to count them, likewise to infer the schema) but for > 300 records that shouldn't be much of an issue. > > On Thu, Jul 7, 2016 at 3:59 PM, Andy Davidson <a...@santacruzintegration.com> > wrote: >> I am running Spark 1.6.1 built for Hadoop 2.0.0-mr1-cdh4.2.0 and using >> kafka direct stream approach. I am running into performance problems. >> My processing time is > than my window size. Changing window sizes, >> adding cores and executor memory does not change performance. I am >> having a lot of trouble identifying the problem by at the metrics >> provided for streaming apps in the spark application web UI. >> >> I think my performance problem has to with writing the data to S3. >> >> My app receives very complicated JSON. My program is simple, It sorts >> the data into a small set of sets and writes each set as a separate S3 >> object. >> The mini batch data has at most 300 events so I do not think shuffle >> is an issue. >> >> DataFrame rawDF = sqlContext.read().json(jsonRDD).cache(); >> >> Explode tagCol >> >> >> DataFrame rulesDF = activityDF.select(tagCol).distinct(); >> >> Row[] rows = rulesDF.select(tagCol).collect(); >> >> List<String> tags = new ArrayList<String>(100); >> >> for (Row row : rows) { >> >> Object tag = row.get(0); >> >> tags.add(tag.toString()); >> >> } >> >> >> I think the for loop bellow is where the bottle neck is. Is write async() ? >> >> >> If not is there an easy to to vectorize/parallelize this for loop or >> do I have to create the threads my self? >> >> >> Is creating threads in spark a bad idea? >> >> >> >> for(String tag : tags) { >> >> DataFrame saveDF = >> activityDF.filter(activityDF.col(tagCol).equalTo(tag)); >> >> if (saveDF.count() >= 1) { // I do not think count() is an issue >> performance is about 34 ms >> >> String dirPath = ³s3n://myBucket" + File.separator + date + >> File.separator + tag + File.separator + milliSeconds; >> >> saveDF.write().json(dirPath); >> >> } >> >> } >> >> >> Any suggestions would be greatly appreciated >> >> >> Andy >> >> > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >