Hi Pedro These are much more accurate performance numbers
In total I have 5,671,287 rows. Each row was stored in JSON. The JSON is very complicated and can be upto 4k per row I randomly picked 30 partitions. My ³big² files are at most 64M execution timesrccoalesce(num)file sizenum files 34min 54ss3Falsesmall5,970 32min 24s errors330small5,970 13min 13sHDFSFalsesmall5,970 09min 42sHDFS30small5,970 04min 29sHDFSFalsebig1,046 04min 44sHDFS30big1,046 From: Andrew Davidson <a...@santacruzintegration.com> Date: Friday, July 29, 2016 at 1:04 PM To: Andrew Davidson <a...@santacruzintegration.com>, Pedro Rodriguez <ski.rodrig...@gmail.com> Cc: "user @spark" <user@spark.apache.org> Subject: use big files and read from HDFS was: performance problem when reading lots of small files created by spark streaming. > Hi Pedro > > I did some experiments. I using one of our relatively small data set. The > data set is loaded into 3 or 4 data frames. I then call count() > > > Looks like using bigger files and reading from HDFS is a good solution for > reading data. I guess I¹ll need to do something similar to this to deal with > S3 write performance > > I think this could probably be tuned up a bit. I randomly choose a max 30 > partitions for each data frame. When I combined files I checked the the > combined file size was < 64mb (64,000,000) how ever in practice the are bigger > > Andy > > > execution timesrcis coalescefile sizenum files > 39min 44ss3Falsesmall270,518 > 32min 24ss330small270,518 > 3min 09sHDFS30small270,518 > 4min 24sHDFSFalsesmall270,518 > 2min 19sHDFSFalsebig001,046 > 2min 06sHDFS30big001,046 > > > From: Andrew Davidson <a...@santacruzintegration.com> > Date: Thursday, July 28, 2016 at 8:58 AM > To: Pedro Rodriguez <ski.rodrig...@gmail.com> > Cc: "user @spark" <user@spark.apache.org> > Subject: Re: performance problem when reading lots of small files created by > spark streaming. > >> Hi Pedro >> >> Thanks for the explanation. I started watching your repo. In the short term I >> think I am going to try concatenating my small files into 64MB and using >> HDFS. My spark streaming app is implemented Java and uses data frames. It >> writes to s3. My batch processing is written in python It reads data into >> data frames. >> >> Its probably a lot of work to make your solution working in these other >> contexts. >> >> Here is another use case you might be interested in >> Writing multiple files to S3 is really slow. It causes a lot of problems for >> my streaming app. Bad things happen if your processing time exceeds your >> window length. Our streaming app must save all the input. For each mini batch >> we split the input into as many as 30 different data sets. Each one needs to >> be written to S3. >> >> As a temporary work around I use an executor service to try and get more >> concurrent writes. Ideally the spark frame work would provide support for >> async IO, and hopefully the S3 performance issue would be improved. Here is >> my code if you are interested >> >> >> public class StreamingKafkaGnipCollector { >> >> static final int POOL_SIZE = 30; >> >> static ExecutorService executor = >> Executors.newFixedThreadPool(POOL_SIZE); >> >> >> >> >> private static void saveRawInput(SQLContext sqlContext, >> JavaPairInputDStream<String, String> messages, String outputURIBase) { >> >> JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, >> String>() { >> >> private static final long serialVersionUID = 1L; >> >> >> >> @Override >> >> public String call(Tuple2<String, String> tuple2) { >> >> //logger.warn("TODO _2:{}", tuple2._2); >> >> return tuple2._2(); >> >> } >> >> }); >> >> >> >> lines.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() { >> >> @Override >> >> public void call(JavaRDD<String> jsonRDD, Time time) throws Exception { >> >> >> // df.write().json("s3://"); is very slow >> >> // run saves concurrently >> >> List<SaveData> saveData = new ArrayList<SaveData>(100); >> >> for (String tag: tags) { >> >> DataFrame saveDF = activityDF.filter(activityDF.col(tagCol).equalTo(tag)); >> >> String dirPath = createPath(outputURIBase, date, tag, milliSeconds); >> >> saveData.add(new SaveData(saveDF, dirPath)); >> >> } >> >> >> >> saveImpl(saveData, executor); // concurrent writes to S3 >> >> } >> >> private void saveImpl(List<SaveData> saveData, ExecutorService executor) { >> >> List<Future<?>> runningThreads = new ArrayList<Future<?>>(POOL_SIZE); >> >> for(SaveData data : saveData) { >> >> SaveWorker worker = new SaveWorker(data); >> >> Future<?> f = executor.submit(worker); >> >> runningThreads.add(f); >> >> } >> >> // wait for all the workers to complete >> >> for (Future<?> worker : runningThreads) { >> >> try { >> >> worker.get(); >> >> logger.debug("worker completed"); >> >> } catch (InterruptedException e) { >> >> logger.error("", e); >> >> } catch (ExecutionException e) { >> >> logger.error("", e); >> >> } >> >> } >> >> } >> >> >> >> static class SaveData { >> >> private DataFrame df; >> >> private String path; >> >> >> >> SaveData(DataFrame df, String path) { >> >> this.df = df; >> >> this.path = path; >> >> } >> >> } >> >> static class SaveWorker implements Runnable { >> >> SaveData data; >> >> >> >> public SaveWorker(SaveData data) { >> >> this.data = data; >> >> } >> >> >> >> @Override >> >> public void run() { >> >> if (data.df.count() >= 1) { >> >> data.df.write().json(data.path); >> >> } >> >> } >> >> } >> >> } >> >> >> >> From: Pedro Rodriguez <ski.rodrig...@gmail.com> >> Date: Wednesday, July 27, 2016 at 8:40 PM >> To: Andrew Davidson <a...@santacruzintegration.com> >> Cc: "user @spark" <user@spark.apache.org> >> Subject: Re: performance problem when reading lots of small files created by >> spark streaming. >> >>> There are a few blog posts that detail one possible/likely issue for >>> example: >>> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219 >>> >>> TLDR: The hadoop libraries spark uses assumes that its input comes from a >>> file system (works with HDFS) however S3 is a key value store, not a file >>> system. Somewhere along the line, this makes things very slow. Below I >>> describe their approach and a library I am working on to solve this problem. >>> >>> (Much) Longer Version (with a shiny new library in development): >>> So far in my reading of source code, Hadoop attempts to actually read from >>> S3 which can be expensive particularly since it does so from a single driver >>> core (different from listing files, actually reading them, I can find the >>> source code and link it later if you would like). The concept explained >>> above is to instead use the AWS sdk to list files then distribute the files >>> names as a collection with sc.parallelize, then read them in parallel. I >>> found this worked, but lacking in a few ways so I started this project: >>> https://github.com/EntilZha/spark-s3 >>> >>> This takes that idea further by: >>> 1. Rather than sc.parallelize, implement the RDD interface where each >>> partition is defined by the files it needs to read (haven't gotten to >>> DataFrames yet) >>> 2. At the driver node, use the AWS SDK to list all the files with their size >>> (listing is fast), then run the Least Processing Time Algorithm to sift the >>> files into roughly balanced partitions by size >>> 3. API: S3Context(sc).textFileByPrefix("bucket", "file1", >>> "folder2").regularRDDOperationsHere or import implicits and do >>> sc.s3.textFileByPrefix >>> >>> At present, I am battle testing and benchmarking it at my current job and >>> results are promising with significant improvements to jobs dealing with >>> many files especially many small files and to jobs whose input is unbalanced >>> to start with. Jobs perform better because: 1) there isn't a long stall at >>> the driver when hadoop decides how to split S3 files 2) the partitions end >>> up nearly perfectly balanced because of LPT algorithm. >>> >>> Since I hadn't intended to advertise this quite yet the documentation is not >>> super polished but exists here: >>> http://spark-s3.entilzha.io/latest/api/#io.entilzha.spark.s3.S3Context >>> >>> I am completing the sonatype process for publishing artifacts on maven >>> central (this should be done by tomorrow so referencing >>> "io.entilzha:spark-s3_2.10:0.0.0" should work very soon). I would love to >>> hear if this library solution works, otherwise I hope the blog post above is >>> illuminating. >>> >>> Pedro >>> >>> On Wed, Jul 27, 2016 at 8:19 PM, Andy Davidson >>> <a...@santacruzintegration.com> wrote: >>>> I have a relatively small data set however it is split into many small JSON >>>> files. Each file is between maybe 4K and 400K >>>> This is probably a very common issue for anyone using spark streaming. My >>>> streaming app works fine, how ever my batch application takes several hours >>>> to run. >>>> >>>> All I am doing is calling count(). Currently I am trying to read the files >>>> from s3. When I look at the app UI it looks like spark is blocked probably >>>> on IO? Adding additional workers and memory does not improve performance. >>>> >>>> I am able to copy the files from s3 to a worker relatively quickly. So I do >>>> not think s3 read time is the problem. >>>> >>>> In the past when I had similar data sets stored on HDFS I was able to use >>>> coalesce() to reduce the number of partition from 200K to 30. This made a >>>> big improvement in processing time. How ever when I read from s3 coalesce() >>>> does not improve performance. >>>> >>>> I tried copying the files to a normal file system and then using hadoop fs >>>> put¹ to copy the files to hdfs how ever this takes several hours and is no >>>> where near completion. It appears hdfs does not deal with small files well. >>>> >>>> I am considering copying the files from s3 to a normal file system on one >>>> of my workers and then concatenating the files into a few much large files, >>>> then using hadoop fs put¹ to move them to hdfs. Do you think this would >>>> improve the spark count() performance issue? >>>> >>>> Does anyone know of heuristics for determining the number or size of the >>>> concatenated files? >>>> >>>> Thanks in advance >>>> >>>> Andy >>> >>> >>> >>> -- >>> Pedro Rodriguez >>> PhD Student in Distributed Machine Learning | CU Boulder >>> UC Berkeley AMPLab Alumni >>> >>> ski.rodrig...@gmail.com | pedrorodriguez.io <http://pedrorodriguez.io> | >>> 909-353-4423 >>> Github: github.com/EntilZha <http://github.com/EntilZha> | LinkedIn: >>> https://www.linkedin.com/in/pedrorodriguezscience >>> >