Hi Pedro I did some experiments. I using one of our relatively small data set. The data set is loaded into 3 or 4 data frames. I then call count()
Looks like using bigger files and reading from HDFS is a good solution for reading data. I guess I¹ll need to do something similar to this to deal with S3 write performance I think this could probably be tuned up a bit. I randomly choose a max 30 partitions for each data frame. When I combined files I checked the the combined file size was < 64mb (64,000,000) how ever in practice the are bigger Andy execution timesrcis coalescefile sizenum files 39min 44ss3Falsesmall270,518 32min 24ss330small270,518 3min 09sHDFS30small270,518 4min 24sHDFSFalsesmall270,518 2min 19sHDFSFalsebig001,046 2min 06sHDFS30big001,046 From: Andrew Davidson <a...@santacruzintegration.com> Date: Thursday, July 28, 2016 at 8:58 AM To: Pedro Rodriguez <ski.rodrig...@gmail.com> Cc: "user @spark" <user@spark.apache.org> Subject: Re: performance problem when reading lots of small files created by spark streaming. > Hi Pedro > > Thanks for the explanation. I started watching your repo. In the short term I > think I am going to try concatenating my small files into 64MB and using HDFS. > My spark streaming app is implemented Java and uses data frames. It writes to > s3. My batch processing is written in python It reads data into data frames. > > Its probably a lot of work to make your solution working in these other > contexts. > > Here is another use case you might be interested in > Writing multiple files to S3 is really slow. It causes a lot of problems for > my streaming app. Bad things happen if your processing time exceeds your > window length. Our streaming app must save all the input. For each mini batch > we split the input into as many as 30 different data sets. Each one needs to > be written to S3. > > As a temporary work around I use an executor service to try and get more > concurrent writes. Ideally the spark frame work would provide support for > async IO, and hopefully the S3 performance issue would be improved. Here is my > code if you are interested > > > public class StreamingKafkaGnipCollector { > > static final int POOL_SIZE = 30; > > static ExecutorService executor = Executors.newFixedThreadPool(POOL_SIZE); > > > > > private static void saveRawInput(SQLContext sqlContext, > JavaPairInputDStream<String, String> messages, String outputURIBase) { > > JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, > String>() { > > private static final long serialVersionUID = 1L; > > > > @Override > > public String call(Tuple2<String, String> tuple2) { > > //logger.warn("TODO _2:{}", tuple2._2); > > return tuple2._2(); > > } > > }); > > > > lines.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() { > > @Override > > public void call(JavaRDD<String> jsonRDD, Time time) throws Exception { > > > // df.write().json("s3://"); is very slow > > // run saves concurrently > > List<SaveData> saveData = new ArrayList<SaveData>(100); > > for (String tag: tags) { > > DataFrame saveDF = activityDF.filter(activityDF.col(tagCol).equalTo(tag)); > > String dirPath = createPath(outputURIBase, date, tag, milliSeconds); > > saveData.add(new SaveData(saveDF, dirPath)); > > } > > > > saveImpl(saveData, executor); // concurrent writes to S3 > > } > > private void saveImpl(List<SaveData> saveData, ExecutorService executor) { > > List<Future<?>> runningThreads = new ArrayList<Future<?>>(POOL_SIZE); > > for(SaveData data : saveData) { > > SaveWorker worker = new SaveWorker(data); > > Future<?> f = executor.submit(worker); > > runningThreads.add(f); > > } > > // wait for all the workers to complete > > for (Future<?> worker : runningThreads) { > > try { > > worker.get(); > > logger.debug("worker completed"); > > } catch (InterruptedException e) { > > logger.error("", e); > > } catch (ExecutionException e) { > > logger.error("", e); > > } > > } > > } > > > > static class SaveData { > > private DataFrame df; > > private String path; > > > > SaveData(DataFrame df, String path) { > > this.df = df; > > this.path = path; > > } > > } > > static class SaveWorker implements Runnable { > > SaveData data; > > > > public SaveWorker(SaveData data) { > > this.data = data; > > } > > > > @Override > > public void run() { > > if (data.df.count() >= 1) { > > data.df.write().json(data.path); > > } > > } > > } > > } > > > > From: Pedro Rodriguez <ski.rodrig...@gmail.com> > Date: Wednesday, July 27, 2016 at 8:40 PM > To: Andrew Davidson <a...@santacruzintegration.com> > Cc: "user @spark" <user@spark.apache.org> > Subject: Re: performance problem when reading lots of small files created by > spark streaming. > >> There are a few blog posts that detail one possible/likely issue for example: >> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219 >> >> TLDR: The hadoop libraries spark uses assumes that its input comes from a >> file system (works with HDFS) however S3 is a key value store, not a file >> system. Somewhere along the line, this makes things very slow. Below I >> describe their approach and a library I am working on to solve this problem. >> >> (Much) Longer Version (with a shiny new library in development): >> So far in my reading of source code, Hadoop attempts to actually read from S3 >> which can be expensive particularly since it does so from a single driver >> core (different from listing files, actually reading them, I can find the >> source code and link it later if you would like). The concept explained above >> is to instead use the AWS sdk to list files then distribute the files names >> as a collection with sc.parallelize, then read them in parallel. I found this >> worked, but lacking in a few ways so I started this project: >> https://github.com/EntilZha/spark-s3 >> >> This takes that idea further by: >> 1. Rather than sc.parallelize, implement the RDD interface where each >> partition is defined by the files it needs to read (haven't gotten to >> DataFrames yet) >> 2. At the driver node, use the AWS SDK to list all the files with their size >> (listing is fast), then run the Least Processing Time Algorithm to sift the >> files into roughly balanced partitions by size >> 3. API: S3Context(sc).textFileByPrefix("bucket", "file1", >> "folder2").regularRDDOperationsHere or import implicits and do >> sc.s3.textFileByPrefix >> >> At present, I am battle testing and benchmarking it at my current job and >> results are promising with significant improvements to jobs dealing with many >> files especially many small files and to jobs whose input is unbalanced to >> start with. Jobs perform better because: 1) there isn't a long stall at the >> driver when hadoop decides how to split S3 files 2) the partitions end up >> nearly perfectly balanced because of LPT algorithm. >> >> Since I hadn't intended to advertise this quite yet the documentation is not >> super polished but exists here: >> http://spark-s3.entilzha.io/latest/api/#io.entilzha.spark.s3.S3Context >> >> I am completing the sonatype process for publishing artifacts on maven >> central (this should be done by tomorrow so referencing >> "io.entilzha:spark-s3_2.10:0.0.0" should work very soon). I would love to >> hear if this library solution works, otherwise I hope the blog post above is >> illuminating. >> >> Pedro >> >> On Wed, Jul 27, 2016 at 8:19 PM, Andy Davidson >> <a...@santacruzintegration.com> wrote: >>> I have a relatively small data set however it is split into many small JSON >>> files. Each file is between maybe 4K and 400K >>> This is probably a very common issue for anyone using spark streaming. My >>> streaming app works fine, how ever my batch application takes several hours >>> to run. >>> >>> All I am doing is calling count(). Currently I am trying to read the files >>> from s3. When I look at the app UI it looks like spark is blocked probably >>> on IO? Adding additional workers and memory does not improve performance. >>> >>> I am able to copy the files from s3 to a worker relatively quickly. So I do >>> not think s3 read time is the problem. >>> >>> In the past when I had similar data sets stored on HDFS I was able to use >>> coalesce() to reduce the number of partition from 200K to 30. This made a >>> big improvement in processing time. How ever when I read from s3 coalesce() >>> does not improve performance. >>> >>> I tried copying the files to a normal file system and then using hadoop fs >>> put¹ to copy the files to hdfs how ever this takes several hours and is no >>> where near completion. It appears hdfs does not deal with small files well. >>> >>> I am considering copying the files from s3 to a normal file system on one of >>> my workers and then concatenating the files into a few much large files, >>> then using hadoop fs put¹ to move them to hdfs. Do you think this would >>> improve the spark count() performance issue? >>> >>> Does anyone know of heuristics for determining the number or size of the >>> concatenated files? >>> >>> Thanks in advance >>> >>> Andy >> >> >> >> -- >> Pedro Rodriguez >> PhD Student in Distributed Machine Learning | CU Boulder >> UC Berkeley AMPLab Alumni >> >> ski.rodrig...@gmail.com | pedrorodriguez.io <http://pedrorodriguez.io> | >> 909-353-4423 >> Github: github.com/EntilZha <http://github.com/EntilZha> | LinkedIn: >> https://www.linkedin.com/in/pedrorodriguezscience >>