Hi Pedro

Thanks for the explanation. I started watching your repo. In the short term
I think I am going to try concatenating my small files into 64MB and using
HDFS. My spark streaming app is implemented Java and uses data frames. It
writes to s3. My batch processing is written in python It reads data into
data frames.

Its probably a lot of work to make your solution working in these other
contexts.

Here is another use case you might be interested in
Writing multiple files to S3 is really slow. It causes a lot of problems for
my streaming app. Bad things happen if your processing time exceeds your
window length. Our streaming app must save all the input. For each mini
batch we split the input into as many as 30 different data sets. Each one
needs to be written to S3.

As a temporary work around I use an executor service to try and get more
concurrent writes. Ideally the spark frame work would provide support for
async IO, and hopefully the S3 performance issue would be improved. Here is
my code if you are interested


public class StreamingKafkaGnipCollector {

    static final int POOL_SIZE = 30;

    static ExecutorService executor =
Executors.newFixedThreadPool(POOL_SIZE);


Š

private static void saveRawInput(SQLContext sqlContext,
JavaPairInputDStream<String, String> messages, String outputURIBase) {

JavaDStream<String> lines = messages.map(new Function<Tuple2<String,
String>, String>() {

private static final long serialVersionUID = 1L;



@Override

public String call(Tuple2<String, String> tuple2) {

//logger.warn("TODO _2:{}", tuple2._2);

return tuple2._2();

}

});



lines.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {

@Override

public void call(JavaRDD<String> jsonRDD, Time time) throws Exception {

Š
// df.write().json("s3://"); is very slow

// run saves concurrently

List<SaveData> saveData = new ArrayList<SaveData>(100);

for (String tag: tags) {

DataFrame saveDF = activityDF.filter(activityDF.col(tagCol).equalTo(tag));

String dirPath = createPath(outputURIBase, date, tag, milliSeconds);

saveData.add(new SaveData(saveDF, dirPath));

}



saveImpl(saveData, executor); // concurrent writes to S3

}

private void saveImpl(List<SaveData> saveData, ExecutorService executor) {

List<Future<?>> runningThreads = new ArrayList<Future<?>>(POOL_SIZE);

for(SaveData data : saveData) {

SaveWorker worker = new SaveWorker(data);

Future<?> f = executor.submit(worker);

runningThreads.add(f);

}

// wait for all the workers to complete

for (Future<?> worker : runningThreads) {

try {

worker.get();

logger.debug("worker completed");

} catch (InterruptedException e) {

logger.error("", e);

} catch (ExecutionException e) {

logger.error("", e);

}

} 

}



static class SaveData {

private DataFrame df;

private String path;



SaveData(DataFrame df, String path) {

this.df = df;

this.path = path;

}

}

static class SaveWorker implements Runnable {

SaveData data;



public SaveWorker(SaveData data) {

this.data = data;

}



@Override

public void run() {

if (data.df.count() >= 1) {

data.df.write().json(data.path);

}

}

}

}



From:  Pedro Rodriguez <ski.rodrig...@gmail.com>
Date:  Wednesday, July 27, 2016 at 8:40 PM
To:  Andrew Davidson <a...@santacruzintegration.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: performance problem when reading lots of small files created
by spark streaming.

> There are a few blog posts that detail one possible/likely issue for example:
> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
> 
> TLDR: The hadoop libraries spark uses assumes that its input comes from a
> file system (works with HDFS) however S3 is a key value store, not a file
> system. Somewhere along the line, this makes things very slow. Below I
> describe their approach and a library I am working on to solve this problem.
> 
> (Much) Longer Version (with a shiny new library in development):
> So far in my reading of source code, Hadoop attempts to actually read from S3
> which can be expensive particularly since it does so from a single driver core
> (different from listing files, actually reading them, I can find the source
> code and link it later if you would like). The concept explained above is to
> instead use the AWS sdk to list files then distribute the files names as a
> collection with sc.parallelize, then read them in parallel. I found this
> worked, but lacking in a few ways so I started this project:
> https://github.com/EntilZha/spark-s3
> 
> This takes that idea further by:
> 1. Rather than sc.parallelize, implement the RDD interface where each
> partition is defined by the files it needs to read (haven't gotten to
> DataFrames yet)
> 2. At the driver node, use the AWS SDK to list all the files with their size
> (listing is fast), then run the Least Processing Time Algorithm to sift the
> files into roughly balanced partitions by size
> 3. API: S3Context(sc).textFileByPrefix("bucket", "file1",
> "folder2").regularRDDOperationsHere or import implicits and do
> sc.s3.textFileByPrefix
> 
> At present, I am battle testing and benchmarking it at my current job and
> results are promising with significant improvements to jobs dealing with many
> files especially many small files and to jobs whose input is unbalanced to
> start with. Jobs perform better because: 1) there isn't a long stall at the
> driver when hadoop decides how to split S3 files 2) the partitions end up
> nearly perfectly balanced because of LPT algorithm.
> 
> Since I hadn't intended to advertise this quite yet the documentation is not
> super polished but exists here:
> http://spark-s3.entilzha.io/latest/api/#io.entilzha.spark.s3.S3Context
> 
> I am completing the sonatype process for publishing artifacts on maven central
> (this should be done by tomorrow so referencing
> "io.entilzha:spark-s3_2.10:0.0.0" should work very soon). I would love to hear
> if this library solution works, otherwise I hope the blog post above is
> illuminating.
> 
> Pedro
> 
> On Wed, Jul 27, 2016 at 8:19 PM, Andy Davidson <a...@santacruzintegration.com>
> wrote:
>> I have a relatively small data set however it is split into many small JSON
>> files. Each file is between maybe 4K and 400K
>> This is probably a very common issue for anyone using spark streaming. My
>> streaming app works fine, how ever my batch application takes several hours
>> to run. 
>> 
>> All I am doing is calling count(). Currently I am trying to read the files
>> from s3. When I look at the app UI it looks like spark is blocked probably on
>> IO? Adding additional workers and memory does not improve performance.
>> 
>> I am able to copy the files from s3 to a worker relatively quickly. So I do
>> not think s3 read time is the problem.
>> 
>> In the past when I had similar data sets stored on HDFS I was able to use
>> coalesce() to reduce the number of partition from 200K to 30. This made a big
>> improvement in processing time. How ever when I read from s3 coalesce() does
>> not improve performance.
>> 
>> I tried copying the files to a normal file system and then using Œhadoop fs
>> put¹ to copy the files to hdfs how ever this takes several hours and is no
>> where near completion. It appears hdfs does not deal with small files well.
>> 
>> I am considering copying the files from s3 to a normal file system on one of
>> my workers and then concatenating the files into a few much large files, then
>> using Œhadoop fs put¹ to move them to hdfs. Do you think this would improve
>> the spark count() performance issue?
>> 
>> Does anyone know of heuristics for determining the number or size of the
>> concatenated files?
>> 
>> Thanks in advance
>> 
>> Andy
> 
> 
> 
> -- 
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
> 
> ski.rodrig...@gmail.com | pedrorodriguez.io <http://pedrorodriguez.io>  |
> 909-353-4423
> Github: github.com/EntilZha <http://github.com/EntilZha>  | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
> 


Reply via email to