Writing (or reading) small files from spark to s3 can be seriously slow.

You'll get much higher throughput by doing a df.foreachPartition(partition => 
...) and inside each partition, creating an aws s3 client then doing a 
partition.foreach and uploading the files using that s3 client with its own 
threadpool.

As long as you create the s3 client inside the foreachPartition, and close it 
after the partition.foreach(...) is done, you shouldn't have any issues.

Something roughly like this from the DStream docs:

  df.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }

Hope this helps,
Ewan

-----Original Message-----
From: Cody Koeninger [mailto:c...@koeninger.org] 
Sent: 08 July 2016 15:31
To: Andy Davidson <a...@santacruzintegration.com>
Cc: user @spark <user@spark.apache.org>
Subject: Re: is dataframe.write() async? Streaming performance problem

Maybe obvious, but what happens when you change the s3 write to a println of 
all the data?  That should identify whether it's the issue.

count() and read.json() will involve additional tasks (run through the items in 
the rdd to count them, likewise to infer the schema) but for
300 records that shouldn't be much of an issue.

On Thu, Jul 7, 2016 at 3:59 PM, Andy Davidson <a...@santacruzintegration.com> 
wrote:
> I am running Spark 1.6.1 built for Hadoop 2.0.0-mr1-cdh4.2.0 and using 
> kafka direct stream approach. I am running into performance problems. 
> My processing time is > than my window size. Changing window sizes, 
> adding cores and executor memory does not change performance. I am 
> having a lot of trouble identifying the problem by at the metrics 
> provided for streaming apps in the spark application web UI.
>
> I think my performance problem has to with writing the data to S3.
>
> My app receives very complicated JSON. My program is simple, It sorts 
> the data into a small set of sets and writes each set as a separate S3 object.
> The mini batch data has at most 300 events so I do not think shuffle 
> is an issue.
>
> DataFrame rawDF = sqlContext.read().json(jsonRDD).cache();
>
> … Explode tagCol …
>
>
> DataFrame rulesDF = activityDF.select(tagCol).distinct();
>
> Row[] rows = rulesDF.select(tagCol).collect();
>
> List<String> tags = new ArrayList<String>(100);
>
> for (Row row : rows) {
>
> Object tag = row.get(0);
>
> tags.add(tag.toString());
>
> }
>
>
> I think the for loop bellow is where the bottle neck is. Is write async() ?
>
>
> If not is there an easy to to vectorize/parallelize this for loop or 
> do I have to create the threads my self?
>
>
> Is creating threads in spark a bad idea?
>
>
>
> for(String tag : tags) {
>
> DataFrame saveDF = 
> activityDF.filter(activityDF.col(tagCol).equalTo(tag));
>
> if (saveDF.count() >= 1) { // I do not think count() is an issue 
> performance is about 34 ms
>
> String dirPath = “s3n://myBucket" + File.separator + date + 
> File.separator + tag + File.separator +  milliSeconds;
>
> saveDF.write().json(dirPath);
>
> }
>
> }
>
>
> Any suggestions would be greatly appreciated
>
>
> Andy
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to